diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java b/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java index b5d7fdcdb8..ef0766ed47 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java @@ -59,6 +59,10 @@ public PartitionGetter(RowType rowType, List partitionKeys) { } public String getPartition(InternalRow row) { + return getResolvedPartitionSpec(row).getPartitionName(); + } + + public ResolvedPartitionSpec getResolvedPartitionSpec(InternalRow row) { List partitionValues = new ArrayList<>(); for (int i = 0; i < partitionFieldGetters.size(); i++) { InternalRow.FieldGetter partitionFieldGetter = partitionFieldGetters.get(i); @@ -68,8 +72,6 @@ public String getPartition(InternalRow row) { partitionValues.add( PartitionUtils.convertValueOfType(partitionValue, dataType.getTypeRoot())); } - ResolvedPartitionSpec resolvedPartitionSpec = - new ResolvedPartitionSpec(partitionKeys, partitionValues); - return resolvedPartitionSpec.getPartitionName(); + return new ResolvedPartitionSpec(partitionKeys, partitionValues); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 24a8d00d59..24f3d94d67 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -18,11 +18,13 @@ package org.apache.fluss.flink.source; import org.apache.fluss.client.initializer.OffsetsInitializer; +import org.apache.fluss.client.table.getter.PartitionGetter; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.StatisticsColumnsConfig; import org.apache.fluss.config.TableConfig; import org.apache.fluss.flink.FlinkConnectorOptions; +import org.apache.fluss.flink.row.FlinkAsFlussRow; import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction; import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; @@ -38,6 +40,7 @@ import org.apache.fluss.metadata.ChangelogImage; import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.MergeEngineType; +import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.CompoundPredicate; import org.apache.fluss.predicate.PartitionPredicateVisitor; @@ -594,7 +597,8 @@ && hasPrimaryKey() // if not all primary key fields are in condition, fall through to // try partition filter pushdown for partitioned PK tables - if (visitedPkFields.equals(primaryKeyTypes.keySet())) { + if (visitedPkFields.equals(primaryKeyTypes.keySet()) + && lookupCoversAllData(lookupRow)) { singleRowFilter = lookupRow; // FLINK-38635: return all filters as remaining for scan vs lookup safety net return Result.of(acceptedFilters, filters); @@ -889,6 +893,25 @@ private int[] getKeyRowProjection() { return projection; } + private boolean lookupCoversAllData(GenericRowData lookupRow) { + if (!isDataLakeEnabled || !isPartitioned()) { + return true; + } + // TODO: drop this gate once FIP-28 lets the lookup path read expired partitions from the + // lake; then always push the single-row lookup down instead of falling back to a scan. + // Partition keys are a subset of the primary key, so the partition resolves from lookupRow. + RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType); + PartitionGetter partitionGetter = + new PartitionGetter( + flussRowType.project(primaryKeyIndexes), + flussRowType.project(partitionKeyIndexes).getFieldNames()); + PartitionSpec partitionSpec = + partitionGetter + .getResolvedPartitionSpec(new FlinkAsFlussRow(lookupRow)) + .toPartitionSpec(); + return PushdownUtils.partitionExists(tablePath, flussConfig, partitionSpec); + } + @VisibleForTesting @Nullable public LookupCache getCache() { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java index 1afae9516b..0c6be61d82 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java @@ -31,6 +31,7 @@ import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; import org.apache.fluss.flink.source.lookup.LookupNormalizer; import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metadata.TableStats; @@ -334,6 +335,16 @@ public static Collection limitScan( } } + public static boolean partitionExists( + TablePath tablePath, Configuration flussConfig, PartitionSpec partitionSpec) { + try (Connection connection = ConnectionFactory.createConnection(flussConfig); + Admin flussAdmin = connection.getAdmin()) { + return !flussAdmin.listPartitionInfos(tablePath, partitionSpec).get().isEmpty(); + } catch (Exception e) { + throw new FlussRuntimeException(e); + } + } + public static long countTable(TablePath tablePath, Configuration flussConfig) { try (Connection connection = ConnectionFactory.createConnection(flussConfig); Admin flussAdmin = connection.getAdmin()) { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 9837cbcd34..81fa9b0501 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -1119,6 +1119,80 @@ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws Exception { jobClient.cancel().get(); } + @Test + void testPointLookupOnExpiredPartitionReadsFromLake() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + + boolean tieringCancelled = false; + try { + String tableName = "point_lookup_expired_partition_pk_table"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + Map bucketLogEndOffset = new HashMap<>(); + Function> rowGenerator = + (partition) -> + Arrays.asList( + row(3, "string", partition), + row(30, "another_string", partition)); + long tableId = + prepareSimplePKTable( + tablePath, DEFAULT_BUCKET_NUM, true, rowGenerator, bucketLogEndOffset); + + waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, true); + + Map partitionNameByIds = waitUntilPartitions(tablePath); + assertThat(partitionNameByIds.size()).isGreaterThanOrEqualTo(2); + + // stop tiering so the read is served from the lake snapshot; the per-job MiniCluster + // shuts down with the job, so cancel exactly once + jobClient.cancel().get(); + tieringCancelled = true; + + Iterator partitionIterator = partitionNameByIds.values().iterator(); + String expiredPartition = partitionIterator.next(); + String livePartition = partitionIterator.next(); + + admin.dropPartition( + tablePath, + new PartitionSpec(Collections.singletonMap("c3", expiredPartition)), + false) + .get(); + retry( + Duration.ofSeconds(60), + () -> + assertThat(admin.listPartitionInfos(tablePath).get()) + .noneMatch(p -> expiredPartition.equals(p.getPartitionName()))); + + List expiredResult = + collectBatchRows( + batchTEnv + .executeSql( + String.format( + "select * from %s where c1 = 3 and c3 = '%s'", + tableName, expiredPartition)) + .collect()); + assertThat(expiredResult) + .as("point query on a lake-only (expired) partition must read from the lake") + .containsExactly(String.format("+I[3, string, %s]", expiredPartition)); + + List liveResult = + collectBatchRows( + batchTEnv + .executeSql( + String.format( + "select * from %s where c1 = 3 and c3 = '%s'", + tableName, livePartition)) + .collect()); + assertThat(liveResult) + .as("point query on a live partition must still return its row") + .containsExactly(String.format("+I[3, string, %s]", livePartition)); + } finally { + // only cancel here if setup failed before the intended cancel above + if (!tieringCancelled) { + jobClient.cancel().get(); + } + } + } + @Test void testPartitionFilterOnPartitionedTableInBatch() throws Exception { // first of all, start tiering