Skip to content

Commit b9631ee

Browse files
authored
[flink] Sopport refreshing partition asynchronously for lookup join (#7402)
1 parent 9d7c54f commit b9631ee

File tree

7 files changed

+611
-14
lines changed

7 files changed

+611
-14
lines changed

.github/workflows/utitcase-flink-1.x-common.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ concurrency:
3838
jobs:
3939
build_test:
4040
runs-on: ubuntu-latest
41-
timeout-minutes: 60
41+
timeout-minutes: 100
4242

4343
steps:
4444
- name: Checkout code

docs/layouts/shortcodes/generated/flink_connector_configuration.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@
8080
<td>Duration</td>
8181
<td>Specific dynamic partition refresh interval for lookup, scan all partitions and obtain corresponding partition.</td>
8282
</tr>
83+
<tr>
84+
<td><h5>lookup.dynamic-partition.refresh.async</h5></td>
85+
<td style="word-wrap: break-word;">false</td>
86+
<td>Boolean</td>
87+
<td>Whether to refresh dynamic partition lookup table asynchronously. This option only works for full cache dimension table. When enabled, partition changes will be loaded in a background thread while the old partition data continues serving queries. When disabled (default), partition refresh is synchronous and blocks queries until the new partition data is fully loaded.</td>
88+
</tr>
8389
<tr>
8490
<td><h5>lookup.refresh.async</h5></td>
8591
<td style="word-wrap: break-word;">false</td>
@@ -357,4 +363,4 @@
357363
<td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
358364
</tr>
359365
</tbody>
360-
</table>
366+
</table>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,18 @@ public class FlinkConnectorOptions {
276276
.defaultValue(false)
277277
.withDescription("Whether to refresh lookup table in an async thread.");
278278

279+
public static final ConfigOption<Boolean> LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC =
280+
ConfigOptions.key("lookup.dynamic-partition.refresh.async")
281+
.booleanType()
282+
.defaultValue(false)
283+
.withDescription(
284+
"Whether to refresh dynamic partition lookup table asynchronously. "
285+
+ "This option only works for full cache dimension table. "
286+
+ "When enabled, partition changes will be loaded in a background thread "
287+
+ "while the old partition data continues serving queries. "
288+
+ "When disabled (default), partition refresh is synchronous and blocks queries "
289+
+ "until the new partition data is fully loaded.");
290+
279291
public static final ConfigOption<Integer> LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
280292
ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
281293
.intType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868

6969
import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
7070
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
71+
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC;
7172
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
7273
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
7374
import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
@@ -93,8 +94,12 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
9394
private final List<InternalRow.FieldGetter> projectFieldsGetters;
9495

9596
private transient File path;
97+
private transient String tmpDirectory;
9698
private transient LookupTable lookupTable;
9799

100+
// partition refresh
101+
@Nullable private transient PartitionRefresher partitionRefresher;
102+
98103
// interval of refreshing lookup table
99104
private transient Duration refreshInterval;
100105
// timestamp when refreshing lookup table
@@ -161,7 +166,7 @@ public FileStoreLookupFunction(
161166

162167
public void open(FunctionContext context) throws Exception {
163168
this.functionContext = context;
164-
String tmpDirectory = getTmpDirectory(context);
169+
this.tmpDirectory = getTmpDirectory(context);
165170
open(tmpDirectory);
166171
}
167172

@@ -236,6 +241,16 @@ private void open() throws Exception {
236241
lookupTable.specifyPartitions(
237242
partitions, partitionLoader.createSpecificPartFilter());
238243
}
244+
if (partitionLoader instanceof DynamicPartitionLoader) {
245+
// Initialize partition refresher
246+
this.partitionRefresher =
247+
new PartitionRefresher(
248+
options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC)
249+
&& lookupTable instanceof FullCacheLookupTable,
250+
table.name(),
251+
this.tmpDirectory,
252+
partitionLoader.partitions());
253+
}
239254
}
240255

241256
if (cacheRowFilter != null) {
@@ -271,13 +286,16 @@ public Collection<RowData> lookup(RowData keyRow) {
271286
if (partitionLoader == null) {
272287
return lookupInternal(key);
273288
}
274-
275-
if (partitionLoader.partitions().isEmpty()) {
289+
List<BinaryRow> partitions =
290+
partitionRefresher != null
291+
? partitionRefresher.currentPartitions()
292+
: partitionLoader.partitions();
293+
if (partitions.isEmpty()) {
276294
return Collections.emptyList();
277295
}
278296

279297
List<RowData> rows = new ArrayList<>();
280-
for (BinaryRow partition : partitionLoader.partitions()) {
298+
for (BinaryRow partition : partitions) {
281299
rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
282300
}
283301
return rows;
@@ -324,7 +342,18 @@ void tryRefresh() throws Exception {
324342
return;
325343
}
326344

327-
// 2. refresh dynamic partition
345+
// 2. check if async partition refresh has completed, and switch if so
346+
if (partitionRefresher != null && partitionRefresher.isPartitionRefreshAsync()) {
347+
LookupTable newLookupTable =
348+
partitionRefresher.getNewLookupTable(partitionLoader.partitions());
349+
if (newLookupTable != null) {
350+
lookupTable.close();
351+
lookupTable = newLookupTable;
352+
path = partitionRefresher.path();
353+
}
354+
}
355+
356+
// 3. refresh dynamic partition
328357
if (partitionLoader != null) {
329358
boolean partitionChanged = partitionLoader.checkRefresh();
330359
List<BinaryRow> partitions = partitionLoader.partitions();
@@ -334,18 +363,17 @@ void tryRefresh() throws Exception {
334363
}
335364

336365
if (partitionChanged) {
337-
// reopen with latest partition
338-
lookupTable.specifyPartitions(
339-
partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
340-
lookupTable.close();
341-
lookupTable.open();
366+
partitionRefresher.startRefresh(
367+
partitions,
368+
partitionLoader.createSpecificPartFilter(),
369+
lookupTable,
370+
cacheRowFilter);
342371
nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();
343-
// no need to refresh the lookup table because it is reopened
344372
return;
345373
}
346374
}
347375

348-
// 3. refresh lookup table
376+
// 4. refresh lookup table
349377
if (shouldRefreshLookupTable()) {
350378
// Check if we should do full load (close and reopen table) instead of incremental
351379
// refresh
@@ -415,6 +443,10 @@ long nextBlacklistCheckTime() {
415443

416444
@Override
417445
public void close() throws IOException {
446+
if (partitionRefresher != null) {
447+
partitionRefresher.close();
448+
}
449+
418450
if (lookupTable != null) {
419451
lookupTable.close();
420452
lookupTable = null;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,5 +414,16 @@ public Context copy(int[] newProjection) {
414414
joinKey,
415415
requiredCachedBucketIds);
416416
}
417+
418+
public Context copy(File newTempPath) {
419+
return new Context(
420+
table.wrapped(),
421+
projection,
422+
tablePredicate,
423+
projectedPredicate,
424+
newTempPath,
425+
joinKey,
426+
requiredCachedBucketIds);
427+
}
417428
}
418429
}

0 commit comments

Comments
 (0)