Skip to content

[core][flink] Support parallelism snapshot expire#7027

Open
wzhero1 wants to merge 8 commits intoapache:masterfrom
wzhero1:feat/paimon-expire-snapshot-parallel-opt
Open

[core][flink] Support parallelism snapshot expire#7027
wzhero1 wants to merge 8 commits intoapache:masterfrom
wzhero1:feat/paimon-expire-snapshot-parallel-opt

Conversation

@wzhero1
Copy link
Contributor

@wzhero1 wzhero1 commented Jan 13, 2026

Purpose

This PR implements parallel snapshot expiration using Flink distributed execution to improve the performance of large-scale cleanup operations.

Motivation:

  • Serial file deletion becomes a performance bottleneck for tables with large amounts of data
  • Current implementation cannot leverage Flink's distributed computing capabilities

Architecture:

The expire process is split into two phases:

  • Worker phase (parallel flatMap): Deletes data files and changelog files in parallel using RangePartitionedExpireFunction. Tasks are partitioned by snapshot ID range to maximize cache locality (adjacent snapshots share manifest files and tag data file skippers).
  • Sink phase (serial): Deletes manifests and snapshot metadata serially using SnapshotExpireSink, because manifestSkippingSet is global mutable state and snapshot deletion must be contiguous.

Key classes:

  • ExpireSnapshotsPlanner: Computes expiration plan including snapshot range, four types of tasks, protection set, and snapshot cache
  • SnapshotExpireTask: Abstract base with 4 polymorphic subclasses (DeleteDataFilesTask, DeleteChangelogFilesTask, DeleteManifestsTask, DeleteSnapshotTask)
  • ExpireContext: Holds runtime dependencies (SnapshotManager, SnapshotDeletion, ChangelogManager) and shared state (taggedSnapshots, snapshotCache, skippingSet)
  • ExpireSnapshotsPlan: Contains task lists, protection set, snapshot cache, and range-based task partitioning logic
  • DeletionReport: Carries deletion bucket info from workers to sink for empty directory cleanup

Execution modes:

  • Local mode: forceStartFlinkJob=false and parallelism <= 1 → executes locally via ExpireSnapshotsProcedure
  • Parallel mode: forceStartFlinkJob=true or parallelism > 1 → uses Flink distributed execution

Other changes:

  • Moved planner/plan/task/report classes from paimon-core to paimon-flink (per reviewer suggestion)
  • Planner pre-collects snapshots in parallel using CompletableFuture and caches them, serialized to both workers and sink to avoid redundant reads
  • ExpireSnapshotsImpl (serial mode) reuses ExpireSnapshotsPlanner for planning, keeping serial execution path unchanged
  • Added failure recovery test: verifies that if expire job fails midway (data files partially deleted but snapshot metadata intact), a subsequent run completes successfully

Tests

Unit Tests:

  • ExpireSnapshotsPlanTest - Tests range-based task partitioning logic
  • DeletionReportTest - Tests deletion report serialization

Integration Tests:

  • ParallelExpireSnapshotsActionITCase - Extends ExpireSnapshotsTest to validate parallel mode produces same results as serial mode, plus failure recovery test
  • ExpireSnapshotsProcedureITCase - Tests both local and parallel modes via procedure

API and Format

New CLI parameter for expire-snapshots action:

Parameter Type Default Description
--parallelism Integer env parallelism Parallelism for parallel expire workers

No storage format changes.

Backward compatible: Default behavior (local mode) remains unchanged.

Documentation

Yes, this introduces a new feature: parallel snapshot expiration.

Documentation should cover:

  • New --parallelism parameter for expire-snapshots action
  • Either --parallelism > 1 or --force_start_flink_job triggers parallel mode

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from 69b0a18 to f61b9b4 Compare January 13, 2026 11:13
@wzhero1 wzhero1 changed the title [flink] Support parallelism snapshot expire [core][flink] Support parallelism snapshot expire Jan 13, 2026
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from f61b9b4 to be7a32b Compare January 13, 2026 12:19
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from be7a32b to 59788fc Compare January 26, 2026 05:49
Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments as below.

}

@Override
public void run() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this method is overridden, this class needs not implement LocalAction anymore. Still you can keep the executeLocally method for internal use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep LocalAction here. By default (without forceStartFlinkJob), the action runs locally via super.run() → executeLocally(), which is sufficient for normal scenarios and safer. Only when
forceStartFlinkJob is set, we start a Flink job with multi-parallelism for large-scale expiration. This also leaves room for future adjustments.


/** Returns true if forceStartFlinkJob is enabled and parallelism is greater than 1. */
private boolean isParallelMode() {
return forceStartFlinkJob && parallelism != null && parallelism > 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing users to set the parallelism of this action might increase their burden to understand the logic of Paimon. It might be better to derive the parallelism of the job automatically on its own by default.

I noticed that parallelism is mainly used to distribute SnapshotExpireTasks evenly between the RangePartitionedExpireFunction subtasks, so this PR created a List<List<SnapshotExpireTask>> with parallelism batches. Compared with this design, a better way for Flink might be to have the subtasks directly fetch from the original list, continuing to consume the next task after completing the former one. On the one hand, this would make it unnecessary to decide the parallelism of the job from the beginning. On the other hand, this could also achieve better dynamic rebalancing during runtime in case different SnapshotExpireTasks have different workloads.

Besides, the Flink configuration might be a better place to set the job's parallelism, compared with Action arguments.

Copy link
Contributor Author

@wzhero1 wzhero1 Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions! I've made --parallelism optional — when not specified, it falls back to env.getParallelism(), so users can control it via Flink configuration naturally. (Automatic parallelism inference based on workload is out of scope for this PR.)

Regarding the dynamic task fetching approach (env.fromSequence(startId, endId).flatMap(...)), the current design intentionally separates deletion into two phases:

  • Worker phase (parallel): deletes data files and changelog files
  • Sink phase (serial): deletes manifests and snapshot metadata files

This separation is necessary because:

  1. manifestSkippingSet is global and mutable: Built from all tags + endSnapshot manifests. Parallel manifest deletion would require distributed coordination to safely update this set — significantly more complex.
  2. Serial deletion ensures consistency: Parallel snapshot deletion could leave gaps (e.g., snapshots 1, 3, 5 exist but 2, 4 are deleted), which is confusing for users. Serial execution in the sink ensures clean, contiguous
    deletion.
  3. Range partitioning maximizes cache locality: createDataFileSkipperForTags caches tag data files internally. Adjacent snapshots typically share the same nearest tag, so processing contiguous ranges avoids expensive cache rebuilds. Dynamic task fetching (each subtask fetching the next available split) would scatter snapshots across subtasks and break this cache benefit.

Data file deletion is the most time-consuming part and is safely parallelizable. Manifest/snapshot deletion is lightweight but requires ordering guarantees, so serial execution in the sink is the right trade-off.

* @param parallelism target parallelism for distribution
* @return list of task groups, one per worker
*/
public List<List<SnapshotExpireTask>> partitionTasksBySnapshotRange(int parallelism) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this PR divides the expire process of a snapshot into different SnapshotExpireTasks first, and then sends the tasks of the same snapshot to the same Flink subtask. If this is the case, why should this PR make the division first? Would the following implementation simpler?

// calculates the start and end snapshot id before this part of code

env.fromSequence(startId, endId)
    .flatMap(new SnapshotExpireFunction()) // deletes the data, manifest, and snapshot file sequentially
    .sinkTo(new SnapshotExpireSink());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplified approach (env.fromSequence + flatMap) has two main issues:

  1. Gap problem: Parallel deletion could leave gaps in snapshot history (e.g., 1, 3, 5 exist but 2, 4 are deleted), which is confusing for users.
  2. Cache locality: Adjacent snapshots typically share the same nearest tag. Range partitioning allows each subtask to reuse createDataFileSkipperForTags cache, while dynamic fetching would scatter snapshots and break this benefit.

See my response in comment#2 for more details.

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch 3 times, most recently from b1d2e49 to 344f5c8 Compare March 12, 2026 07:19
ExpireSnapshotsPlan plan = planner.plan(expireConfig);
if (plan.isEmpty()) {
LOG.info("No snapshots to expire");
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to configure an empty Flink job, even if the job would complete immediately after submission. This is what "force_start_flink_job" means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep the early return here. Submitting an empty Flink job still involves cluster resource allocation and scheduling overhead for zero useful work. I've improved the log message to make the skip behavior clear to users.

plan.snapshotFileTasks(),
expireConfig.isChangelogDecoupled()))
.setParallelism(1)
.name("SnapshotExpire");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operator name can be improved. The upper-stream operator is also doing snapshot-expire jobs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to "SnapshotExpireCommit" to distinguish from the upstream worker phase.

}
}

private DeletionReport processTask(SnapshotExpireTask task) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little over-designed here, introducing a method for two lines of code that is only used once. It can be directly put into the flatMap method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Inlined processTask into flatMap.

public void flatMap(List<SnapshotExpireTask> tasks, Collector<DeletionReport> out)
throws Exception {
// Process tasks sequentially in order to maximize cache locality
for (SnapshotExpireTask task : tasks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add some logs to allow users to track the current expiration progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added progress logging: batch-level start/end at INFO with elapsed time, per-task detail with task type, snapshot ID and progress counter.

* the serial implementation. It extends the production {@link RangePartitionedExpireFunction} and
* {@link SnapshotExpireSink} classes, overriding {@code initExecutor} to inject test executors.
*/
public class ExpireSnapshotsActionITCase extends ExpireSnapshotsTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test case that verifies, if the snapshot expiration job failed accidentally in the middle, it can still complete the expiration job after recovery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added testExpireRecoveryAfterPartialFailure: uses a failAfterTasks flag in TestExpireFunction to simulate a mid-job crash during the worker phase (some data files deleted, but snapshot metadata intact). Then runs expire again and verifies it completes successfully despite missing files.

import java.util.function.Supplier;

/**
* IT Case for parallel expire snapshots using Flink execution. This class extends {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this ITCase is specifically designed to test parallel mode, it is better to modify the name of this test class to reflect this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Renamed ExpireSnapshotsActionITCase to ParallelExpireSnapshotsActionITCase to clarify that this test class is specifically for parallel mode.


@Test
@Disabled(
"Tests concurrent expire scenario which has different semantics in Flink parallel mode")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate how the two modes are different, and whether this difference would bring an influence on the expected expiration result for users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case DELETE_MANIFESTS:
return executeDeleteManifests(task, snapshot, skippingSet);
case DELETE_SNAPSHOT:
return executeDeleteSnapshot(task, snapshot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to convert 4 TaskTypes to 4 subclasses of SnapshotExpireTask. The 4 switch branches here can be converted to a method in the SnapshotExpireTask interface/abstract class, and provide different implementations for this method in the 4 subclasses.

Copy link
Contributor Author

@wzhero1 wzhero1 Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored as suggested. Replaced the ExpireSnapshotsExecutor switch-based dispatch with polymorphic SnapshotExpireTask subclasses (DeleteDataFilesTask, DeleteChangelogFilesTask,
DeleteManifestsTask, DeleteSnapshotTask). Each subclass encapsulates its own deletion logic via execute(ExpireContext). Also simplified ExpireContext by removing delegate methods — callers now access underlying objects directly (e.g., context.snapshotDeletion().cleanUnusedDataFiles()). taggedSnapshots and snapshotCache are now constructor parameters of ExpireContext instead of setters.

* <li>Phase 2b: Delete all snapshot files (using snapshotFileTasks)
* </ul>
*/
public class ExpireSnapshotsImpl implements ExpireSnapshots {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to avoid doing a complete re-structure to ExpireSnapshotsImpl if it has been working well. Instead, we can have most of the logic in paimon-flink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, remove the complete re-structure to ExpireSnapshotsImpl.

@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch 5 times, most recently from 1aa9654 to 35d548b Compare March 23, 2026 02:05
@wzhero1 wzhero1 force-pushed the feat/paimon-expire-snapshot-parallel-opt branch from 35d548b to 8985e0e Compare March 23, 2026 03:09
@wzhero1
Copy link
Contributor Author

wzhero1 commented Mar 23, 2026

Based on actual implementation:

PR Comment work items:

  1. Empty plan logging: Adjusted log message to "No snapshots to expire, skipping Flink job submission." (still returns early, not submitting an empty job)
  2. Sink operator rename: Renamed sink operator from "SnapshotExpire" to "SnapshotExpireCommit" to distinguish from worker phase
  3. Inline executeTask: Removed single-use method, inlined task execution directly into flatMap
  4. Progress logging: Added per-task progress log ("Processing expire task {}/{}, {}"), start/end logs with elapsed time
  5. Failure recovery test: Added testExpireRecoveryAfterPartialFailure with failure injection via volatile int failAfterTasks
  6. Test class rename: ExpireSnapshotsActionITCase → ParallelExpireSnapshotsActionITCase
  7. Disabled test comments: Added @disabled annotations with explanations for tests not suitable for parallel mode
  8. Polymorphic refactoring: Replaced TaskType enum + ExpireSnapshotsExecutor switch dispatch with 4 polymorphic SnapshotExpireTask subclasses, deleted ExpireSnapshotsExecutor
  9. Logic migration: Moved planner/plan/task/report classes from paimon-core to paimon-flink package

Additional changes (not from comments):

  • ExpireContext simplification: Removed delegate methods, callers access underlying objects directly (context.snapshotDeletion().xxx()). Made taggedSnapshots and snapshotCache
    final constructor parameters, only skippingSet retains a setter (mutated during manifest deletion)
  • snapshotCache in parallel mode: Propagated planner's pre-collected snapshot cache to both workers and sink via serialization, avoiding redundant snapshot file reads
  • Trigger condition: Changed to forceStartFlinkJob || parallelism > 1 — either condition enters Flink parallel pipeline

@wzhero1 wzhero1 marked this pull request as ready for review March 23, 2026 06:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants