[core][flink] Support parallelism snapshot expire#7027
[core][flink] Support parallelism snapshot expire#7027wzhero1 wants to merge 8 commits intoapache:masterfrom
Conversation
69b0a18 to
f61b9b4
Compare
f61b9b4 to
be7a32b
Compare
be7a32b to
59788fc
Compare
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Thanks for the PR. Left some comments as below.
| } | ||
|
|
||
| @Override | ||
| public void run() throws Exception { |
There was a problem hiding this comment.
In case this method is overridden, this class needs not implement LocalAction anymore. Still you can keep the executeLocally method for internal use.
There was a problem hiding this comment.
Prefer to keep LocalAction here. By default (without forceStartFlinkJob), the action runs locally via super.run() → executeLocally(), which is sufficient for normal scenarios and safer. Only when
forceStartFlinkJob is set, we start a Flink job with multi-parallelism for large-scale expiration. This also leaves room for future adjustments.
|
|
||
| /** Returns true if forceStartFlinkJob is enabled and parallelism is greater than 1. */ | ||
| private boolean isParallelMode() { | ||
| return forceStartFlinkJob && parallelism != null && parallelism > 1; |
There was a problem hiding this comment.
Forcing users to set the parallelism of this action might increase their burden to understand the logic of Paimon. It might be better to derive the parallelism of the job automatically on its own by default.
I noticed that parallelism is mainly used to distribute SnapshotExpireTasks evenly between the RangePartitionedExpireFunction subtasks, so this PR created a List<List<SnapshotExpireTask>> with parallelism batches. Compared with this design, a better way for Flink might be to have the subtasks directly fetch from the original list, continuing to consume the next task after completing the former one. On the one hand, this would make it unnecessary to decide the parallelism of the job from the beginning. On the other hand, this could also achieve better dynamic rebalancing during runtime in case different SnapshotExpireTasks have different workloads.
Besides, the Flink configuration might be a better place to set the job's parallelism, compared with Action arguments.
There was a problem hiding this comment.
Thanks for the suggestions! I've made --parallelism optional — when not specified, it falls back to env.getParallelism(), so users can control it via Flink configuration naturally. (Automatic parallelism inference based on workload is out of scope for this PR.)
Regarding the dynamic task fetching approach (env.fromSequence(startId, endId).flatMap(...)), the current design intentionally separates deletion into two phases:
- Worker phase (parallel): deletes data files and changelog files
- Sink phase (serial): deletes manifests and snapshot metadata files
This separation is necessary because:
- manifestSkippingSet is global and mutable: Built from all tags + endSnapshot manifests. Parallel manifest deletion would require distributed coordination to safely update this set — significantly more complex.
- Serial deletion ensures consistency: Parallel snapshot deletion could leave gaps (e.g., snapshots 1, 3, 5 exist but 2, 4 are deleted), which is confusing for users. Serial execution in the sink ensures clean, contiguous
deletion. - Range partitioning maximizes cache locality: createDataFileSkipperForTags caches tag data files internally. Adjacent snapshots typically share the same nearest tag, so processing contiguous ranges avoids expensive cache rebuilds. Dynamic task fetching (each subtask fetching the next available split) would scatter snapshots across subtasks and break this cache benefit.
Data file deletion is the most time-consuming part and is safely parallelizable. Manifest/snapshot deletion is lightweight but requires ordering guarantees, so serial execution in the sink is the right trade-off.
.../paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
.../paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
| * @param parallelism target parallelism for distribution | ||
| * @return list of task groups, one per worker | ||
| */ | ||
| public List<List<SnapshotExpireTask>> partitionTasksBySnapshotRange(int parallelism) { |
There was a problem hiding this comment.
It seems that this PR divides the expire process of a snapshot into different SnapshotExpireTasks first, and then sends the tasks of the same snapshot to the same Flink subtask. If this is the case, why should this PR make the division first? Would the following implementation simpler?
// calculates the start and end snapshot id before this part of code
env.fromSequence(startId, endId)
.flatMap(new SnapshotExpireFunction()) // deletes the data, manifest, and snapshot file sequentially
.sinkTo(new SnapshotExpireSink());There was a problem hiding this comment.
The simplified approach (env.fromSequence + flatMap) has two main issues:
- Gap problem: Parallel deletion could leave gaps in snapshot history (e.g., 1, 3, 5 exist but 2, 4 are deleted), which is confusing for users.
- Cache locality: Adjacent snapshots typically share the same nearest tag. Range partitioning allows each subtask to reuse createDataFileSkipperForTags cache, while dynamic fetching would scatter snapshots and break this benefit.
See my response in comment#2 for more details.
...ink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java
Outdated
Show resolved
Hide resolved
...ink/paimon-flink-common/src/main/java/org/apache/paimon/flink/expire/SnapshotExpireSink.java
Outdated
Show resolved
Hide resolved
paimon-core/src/main/java/org/apache/paimon/operation/expire/DeletionReport.java
Outdated
Show resolved
Hide resolved
b1d2e49 to
344f5c8
Compare
| ExpireSnapshotsPlan plan = planner.plan(expireConfig); | ||
| if (plan.isEmpty()) { | ||
| LOG.info("No snapshots to expire"); | ||
| return; |
There was a problem hiding this comment.
Better to configure an empty Flink job, even if the job would complete immediately after submission. This is what "force_start_flink_job" means.
There was a problem hiding this comment.
Prefer to keep the early return here. Submitting an empty Flink job still involves cluster resource allocation and scheduling overhead for zero useful work. I've improved the log message to make the skip behavior clear to users.
| plan.snapshotFileTasks(), | ||
| expireConfig.isChangelogDecoupled())) | ||
| .setParallelism(1) | ||
| .name("SnapshotExpire"); |
There was a problem hiding this comment.
This operator name can be improved. The upper-stream operator is also doing snapshot-expire jobs.
There was a problem hiding this comment.
Renamed to "SnapshotExpireCommit" to distinguish from the upstream worker phase.
| } | ||
| } | ||
|
|
||
| private DeletionReport processTask(SnapshotExpireTask task) { |
There was a problem hiding this comment.
It's a little over-designed here, introducing a method for two lines of code that is only used once. It can be directly put into the flatMap method.
There was a problem hiding this comment.
Done. Inlined processTask into flatMap.
| public void flatMap(List<SnapshotExpireTask> tasks, Collector<DeletionReport> out) | ||
| throws Exception { | ||
| // Process tasks sequentially in order to maximize cache locality | ||
| for (SnapshotExpireTask task : tasks) { |
There was a problem hiding this comment.
Better add some logs to allow users to track the current expiration progress.
There was a problem hiding this comment.
Done. Added progress logging: batch-level start/end at INFO with elapsed time, per-task detail with task type, snapshot ID and progress counter.
| * the serial implementation. It extends the production {@link RangePartitionedExpireFunction} and | ||
| * {@link SnapshotExpireSink} classes, overriding {@code initExecutor} to inject test executors. | ||
| */ | ||
| public class ExpireSnapshotsActionITCase extends ExpireSnapshotsTest { |
There was a problem hiding this comment.
Can we add a test case that verifies, if the snapshot expiration job failed accidentally in the middle, it can still complete the expiration job after recovery?
There was a problem hiding this comment.
Done. Added testExpireRecoveryAfterPartialFailure: uses a failAfterTasks flag in TestExpireFunction to simulate a mid-job crash during the worker phase (some data files deleted, but snapshot metadata intact). Then runs expire again and verifies it completes successfully despite missing files.
| import java.util.function.Supplier; | ||
|
|
||
| /** | ||
| * IT Case for parallel expire snapshots using Flink execution. This class extends {@link |
There was a problem hiding this comment.
Given that this ITCase is specifically designed to test parallel mode, it is better to modify the name of this test class to reflect this point.
There was a problem hiding this comment.
Done. Renamed ExpireSnapshotsActionITCase to ParallelExpireSnapshotsActionITCase to clarify that this test class is specifically for parallel mode.
|
|
||
| @Test | ||
| @Disabled( | ||
| "Tests concurrent expire scenario which has different semantics in Flink parallel mode") |
There was a problem hiding this comment.
Could you please elaborate how the two modes are different, and whether this difference would bring an influence on the expected expiration result for users?
| case DELETE_MANIFESTS: | ||
| return executeDeleteManifests(task, snapshot, skippingSet); | ||
| case DELETE_SNAPSHOT: | ||
| return executeDeleteSnapshot(task, snapshot); |
There was a problem hiding this comment.
It might be better to convert 4 TaskTypes to 4 subclasses of SnapshotExpireTask. The 4 switch branches here can be converted to a method in the SnapshotExpireTask interface/abstract class, and provide different implementations for this method in the 4 subclasses.
There was a problem hiding this comment.
Refactored as suggested. Replaced the ExpireSnapshotsExecutor switch-based dispatch with polymorphic SnapshotExpireTask subclasses (DeleteDataFilesTask, DeleteChangelogFilesTask,
DeleteManifestsTask, DeleteSnapshotTask). Each subclass encapsulates its own deletion logic via execute(ExpireContext). Also simplified ExpireContext by removing delegate methods — callers now access underlying objects directly (e.g., context.snapshotDeletion().cleanUnusedDataFiles()). taggedSnapshots and snapshotCache are now constructor parameters of ExpireContext instead of setters.
| * <li>Phase 2b: Delete all snapshot files (using snapshotFileTasks) | ||
| * </ul> | ||
| */ | ||
| public class ExpireSnapshotsImpl implements ExpireSnapshots { |
There was a problem hiding this comment.
It might be better to avoid doing a complete re-structure to ExpireSnapshotsImpl if it has been working well. Instead, we can have most of the logic in paimon-flink.
There was a problem hiding this comment.
done, remove the complete re-structure to ExpireSnapshotsImpl.
1aa9654 to
35d548b
Compare
35d548b to
8985e0e
Compare
|
Based on actual implementation: PR Comment work items:
Additional changes (not from comments):
|
Purpose
This PR implements parallel snapshot expiration using Flink distributed execution to improve the performance of large-scale cleanup operations.
Motivation:
Architecture:
The expire process is split into two phases:
RangePartitionedExpireFunction. Tasks are partitioned by snapshot ID range to maximize cache locality (adjacent snapshots share manifest files and tag data file skippers).SnapshotExpireSink, becausemanifestSkippingSetis global mutable state and snapshot deletion must be contiguous.Key classes:
ExpireSnapshotsPlanner: Computes expiration plan including snapshot range, four types of tasks, protection set, and snapshot cacheSnapshotExpireTask: Abstract base with 4 polymorphic subclasses (DeleteDataFilesTask,DeleteChangelogFilesTask,DeleteManifestsTask,DeleteSnapshotTask)ExpireContext: Holds runtime dependencies (SnapshotManager, SnapshotDeletion, ChangelogManager) and shared state (taggedSnapshots, snapshotCache, skippingSet)ExpireSnapshotsPlan: Contains task lists, protection set, snapshot cache, and range-based task partitioning logicDeletionReport: Carries deletion bucket info from workers to sink for empty directory cleanupExecution modes:
forceStartFlinkJob=falseandparallelism <= 1→ executes locally viaExpireSnapshotsProcedureforceStartFlinkJob=trueorparallelism > 1→ uses Flink distributed executionOther changes:
CompletableFutureand caches them, serialized to both workers and sink to avoid redundant readsExpireSnapshotsImpl(serial mode) reusesExpireSnapshotsPlannerfor planning, keeping serial execution path unchangedTests
Unit Tests:
ExpireSnapshotsPlanTest- Tests range-based task partitioning logicDeletionReportTest- Tests deletion report serializationIntegration Tests:
ParallelExpireSnapshotsActionITCase- ExtendsExpireSnapshotsTestto validate parallel mode produces same results as serial mode, plus failure recovery testExpireSnapshotsProcedureITCase- Tests both local and parallel modes via procedureAPI and Format
New CLI parameter for
expire-snapshotsaction:--parallelismNo storage format changes.
Backward compatible: Default behavior (local mode) remains unchanged.
Documentation
Yes, this introduces a new feature: parallel snapshot expiration.
Documentation should cover:
--parallelismparameter forexpire-snapshotsaction--parallelism > 1or--force_start_flink_jobtriggers parallel mode