Skip to content

feat(clean): support clean-by-time retention boundary and archive protection#19041

Open
fhan688 wants to merge 3 commits into
apache:masterfrom
fhan688:support-clean-by-time
Open

feat(clean): support clean-by-time retention boundary and archive protection#19041
fhan688 wants to merge 3 commits into
apache:masterfrom
fhan688:support-clean-by-time

Conversation

@fhan688

@fhan688 fhan688 commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

KEEP_LATEST_BY_HOURS calculates the earliest commit to retain (ECTR) as the first completed instant at or after the configured time cutoff. This PR preserves that existing boundary semantics.

The issue addressed here is that the by-hours ECTR should not move past the earliest pending instant. If a pending instant exists before the computed ECTR, the cleaner should retain from the completed instant before that
pending instant, or retain everything when no such completed instant exists. This prevents cleaning decisions from crossing pending writes.

This PR also makes timeline archival optionally respect the latest completed clean's earliestCommitToRetain consistently across timeline archiver versions, and exposes existing cleaner/archive configs through the Flink
write path.

Summary and Changelog

This PR adds clean-by-time safety improvements without introducing any LSM-table-specific behavior.

Changes:

  • Preserve existing KEEP_LATEST_BY_HOURS cutoff semantics by selecting the first completed instant at or after the retention cutoff.
  • Prevent by-hours ECTR from crossing the earliest pending instant.
  • Keep ECTR empty when there is no completed instant at or after the cutoff, even if pending instants exist.
  • Keep ECTR empty when the earliest pending instant precedes all completed instants.
  • Add a shared archival utility to derive the earliest instant to retain from the latest completed clean metadata.
  • Apply clean ECTR archive blocking to both TimelineArchiverV1 and TimelineArchiverV2 when hoodie.archive.block.on.clean.ectr is enabled.
  • Expose Flink options for:
    • hoodie.clean.max.commits
    • hoodie.clean.empty.commit.interval.hours
    • hoodie.archive.block.on.clean.ectr
  • Add tests for by-hours ECTR selection, pending instant protection, V2 archival behavior, and Flink config propagation.

No code was copied from another project.

Impact

User-facing behavior changes:

  • Tables using KEEP_LATEST_BY_HOURS keep the existing time-cutoff boundary semantics, while avoiding clean boundaries that cross pending instants..
  • When archive blocking on clean ECTR is enabled, timeline archiving avoids archiving commits that may still be needed because their data files have not been cleaned yet.
  • Flink writers can configure the existing clean/archive controls through Flink options.

This may retain more active timeline instants in some cases when clean ECTR archive blocking is enabled, which is expected for correctness.

Risk Level

medium

The change affects cleaner retention boundary calculation and timeline archival retention decisions. The implementation is conservative: it avoids cleaning past pending instants and only blocks
archival on clean ECTR when the existing config is enabled.

Verification:

  • mvn -pl hudi-common -am -DskipTests -DskipITs -Dcheckstyle.skip=true test-compile
  • mvn -pl hudi-flink-datasource/hudi-flink -am -DskipITs -Dcheckstyle.skip=true -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=TestFlinkWriteClients#testCleanByTimeConfigsPropagateToWriteConfig test
  • git diff --check

Documentation Update

The Hudi website/config documentation should be updated for the newly exposed Flink options:

  • hoodie.clean.max.commits
  • hoodie.clean.empty.commit.interval.hours
  • hoodie.archive.block.on.clean.ectr

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR corrects the KEEP_LATEST_BY_HOURS retention boundary with a pending-instant guard, extracts the clean-ECTR archive-blocking logic into a shared util used by both TimelineArchiverV1 and TimelineArchiverV2, and surfaces the relevant clean/archive configs through Flink. One edge case in the new by-hours pending guard worth confirming inline. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and simplification suggestions below.

.filter(i -> compareTimestamps(i.requestedTime(), GREATER_THAN_OR_EQUALS, earliestTimeToRetain))
.findFirst());

Option<HoodieInstant> earliestPendingCommit = commitsTimeline.filter(s -> !s.isCompleted()).firstInstant();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Could you confirm the intent when earliestCommitToRetain is initially empty (no completed commit at-or-after the cutoff) but a pending instant exists? With this change, ECTR gets set to the last completed before the pending — typically the latest completed — so a quiet table with one in-flight commit would flip from "no cleaning" to "clean everything older than the latest completed". The parallel KEEP_LATEST_COMMITS branch avoids this because its pending check is nested under countInstants > commitsRetained. Was this asymmetry intended, or should the override only apply when the initial ECTR was already present?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Option<HoodieInstant> earliestInstantToRetain =
commitTimeline.findInstantsAfterOrEquals(earliestCommitToRetain).firstInstant();
log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Earliest instant to retain is {}",
earliestCommitToRetain, lastCleanInstant.get().requestedTime(), earliestInstantToRetain.map(instant -> instant).orElse(null));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: .map(instant -> instant) is a no-op identity transform — could you drop it and use earliestInstantToRetain.orElse(null) directly?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.withFallbackKeys("hoodie.clean.fileversions.retained")
.withDescription("Number of file versions to retain. default 5");

public static final ConfigOption<Long> CLEAN_MAX_COMMITS_TO_CLEAN = ConfigOptions

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the _TO_CLEAN suffix is redundant here — every other option in this class uses the pattern CLEAN_<descriptor> (e.g. CLEAN_RETAIN_COMMITS, CLEAN_RETAIN_HOURS) without repeating the namespace. Could you rename this to CLEAN_MAX_COMMITS to stay consistent?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

earliestCommitToRetain = Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i -> compareTimestamps(i.requestedTime(),
GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
earliestCommitToRetain = Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream()
.filter(i -> compareTimestamps(i.requestedTime(), GREATER_THAN_OR_EQUALS, earliestTimeToRetain))

@yihua yihua Jun 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still selects the first instant >= the cutoff, but the PR description says the fix is to pick the instant at or before the cutoff. Should this be selecting the instant at or before the cutoff instead? As written, only the new pending-instant guard below changes behavior, so I am not sure the sparse-commit-density case from the issue is covered. Could the description and code be reconciled here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still selects the first instant >= the cutoff, but the PR description says the fix is to pick the instant at or before the cutoff. That change isn't here, so the sparse-commit-density bug described in the issue isn't actually addressed — only the new pending-instant guard below is. Please reconcile the description with the code (or add the intended change).

Good catch. The code should keep selecting the first completed instant >= the cutoff. That is the existing clean-by-hours boundary semantics used by CleanPlanner, where ECTR is the first retained instant. Selecting an instant at or before the cutoff would move the boundary too far back and breaks the existing clean plan expectations.

The PR description was inaccurate here. I will update it to say that this patch preserves the >= cutoff selection and only adds pending-instant protection so the by-hours ECTR does not move past the earliest pending instant.

if (earliestPendingCommit.isPresent()
&& (!earliestCommitToRetain.isPresent()
|| compareTimestamps(earliestPendingCommit.get().requestedTime(), LESSER_THAN, earliestCommitToRetain.get().requestedTime()))) {
earliestCommitToRetain = completedCommitsTimeline.findInstantsBefore(earliestPendingCommit.get().requestedTime()).lastInstant();

@yihua yihua Jun 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the earliest pending commit precedes all completed commits, would findInstantsBefore(...).lastInstant() return empty here, collapsing the by-hours ECTR to Option.empty()? If so, does an empty ECTR mean "retain everything" rather than "clean everything" downstream? Might it be worth a test for the pending-older-than-all-completed case too, since the new test only covers a completed commit existing before the pending one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the earliest pending commit precedes all completed commits, findInstantsBefore(...).lastInstant() returns empty and the by-hours ECTR collapses to Option.empty(). Please confirm an empty ECTR means "retain everything" (not "clean everything") downstream, and add a test for the pending-older-than-all-completed case — the new test only covers a completed commit existing before the pending one.

Yes, findInstantsBefore(...).lastInstant() can return empty in that case. Downstream, an empty ECTR means no partition scan/no cleaning for commit/hour based cleaning: CleanPlanner#getPartitionPathsForCleanByCommits
returns an empty partition list when the retained instant is absent.

I added a test for the pending-before-all-completed case. I also tightened the guard so the pending override only applies when the initial by-hours ECTR is present. This avoids turning a "no completed instant after
cutoff" case into a synthetic ECTR based on the pending instant.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, findInstantsBefore(...).lastInstant() can return empty in that case. Downstream, an empty ECTR means no partition scan/no cleaning for commit/hour based cleanin

Could you elaborate why the cleaing be blocked by a pending instant here? since we have completion time file slicing(pending files always belong to the latest one), it is okey to forward the cleaning, and the ECTR should be based on completion time too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, findInstantsBefore(...).lastInstant() can return empty in that case. Downstream, an empty ECTR means no partition scan/no cleaning for commit/hour based cleanin

Could you elaborate why the cleaing be blocked by a pending instant here? since we have completion time file slicing(pending files always belong to the latest one), it is okey to forward the cleaning, and the ECTR should be based on completion time too?

Thanks, this is a good point. I will re-check this against the current completion-time file slicing behavior. If pending files are always attached to the latest slice and the cleaner already preserves that slice, the by-
hours pending guard may be overly conservative.

I’ll verify this with a clean-plan level test instead of only testing CleanerUtils. I’ll also check whether the by-hours ECTR should be derived from completion time rather than requested time. If the existing file-
slice protection is sufficient, I’ll remove the pending guard change and update the PR scope accordingly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, findInstantsBefore(...).lastInstant() can return empty in that case. Downstream, an empty ECTR means no partition scan/no cleaning for commit/hour based cleanin

Could you elaborate why the cleaing be blocked by a pending instant here? since we have completion time file slicing(pending files always belong to the latest one), it is okey to forward the cleaning, and the ECTR should be based on completion time too?

Thanks, this is a good point. I will re-check this against the current completion-time file slicing behavior. If pending files are always attached to the latest slice and the cleaner already preserves that slice, the by- hours pending guard may be overly conservative.

I’ll verify this with a clean-plan level test instead of only testing CleanerUtils. I’ll also check whether the by-hours ECTR should be derived from completion time rather than requested time. If the existing file- slice protection is sufficient, I’ll remove the pending guard change and update the PR scope accordingly.

Additional Information:
I validated the file-slice protection path. Pending log files with no completion time are assigned to the latest file slice, and the cleaner retains the latest slice, so forcing a by-hours ECTR after the pending instant
does not schedule the pending log for deletion. The pending-files check in CleanPlanner mainly protects partition deletion.

That said, your point about by-hours ECTR being based on completion time looks valid. KEEP_LATEST_BY_HOURS currently computes ECTR by filtering completed instants on requestedTime(), while timeline v2 exposes completion-time ordering. I think we should avoid relying on a global pending guard for file deletion and instead focus this PR on making by-hours ECTR completion-time based, plus any archive protection needed around that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should avoid relying on a global pending guard for file deletion and instead focus this PR on making by-hours ECTR completion-time based, plus any archive protection needed around that.

The archival is blocked automatically by pending instants.

// 4. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// 4. If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive
// commits that have data files that haven't been cleaned yet.
earliestInstantToRetainCandidates.add(getEarliestInstantToRetainForClean(table, completedCommitsTimeline, config));

@yihua yihua Jun 20, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes TimelineArchiverV2 (LSM/v9) block on clean ECTR, which inverts the previously-asserted behavior (the old test documented V1-only). Was V1-only an oversight rather than a deliberate LSM restriction? One thing to watch: V2 archival now also aborts with HoodieIOException if the clean-metadata read fails (when the config is enabled). Is that an acceptable new failure mode here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes TimelineArchiverV2 (LSM/v9) block on clean ECTR, inverting the previously-asserted behavior (the old test documented V1-only). Please confirm V1-only was an oversight rather than a deliberate LSM restriction. Note this also adds a new failure mode: V2 archival now aborts with HoodieIOException if the clean-metadata read fails (when the config is enabled).

The V1-only behavior looks like an oversight rather than an intentional LSM/v9 restriction. The config is about preventing archival from crossing the latest clean ECTR, so the safety boundary should be independent of the archiver version.

For the read failure case, this intentionally keeps the same fail-closed behavior as V1 when hoodie.archive.block.on.clean.ectr is enabled. If the clean metadata cannot be read, we cannot safely know the archival boundary, so aborting is preferable to archiving through an unknown clean ECTR. The config remains disabled by default, so this only affects users who explicitly enable the protection.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @nsivabalan to chime in here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @nsivabalan to chime in here

Thanks, I'll wait for Siva's input here.

@github-actions github-actions Bot added the size:M PR with lines of changes in (100, 300] label Jun 20, 2026
@hudi-bot

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

.withDescription("Number of file versions to retain. default 5");

public static final ConfigOption<Long> CLEAN_MAX_COMMITS_TO_CLEAN = ConfigOptions
.key(HoodieCleanConfig.MAX_COMMITS_TO_CLEAN.key())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add option dupilicates for Flink, they are applied automatically.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add option dupilicates for Flink, they are applied automatically.

Thanks, that makes sense. I checked the Flink write config path again: FlinkWriteClients#getHoodieClientConfig already calls withProps(flinkConf2TypedProperties(conf)), and flinkConf2TypedProperties propagates raw hoodie.* options into HoodieWriteConfig. The clean/archive getters then read these values directly from the final props.

So the explicit FlinkOptions entries and builder calls are redundant here. I will remove the added Flink option constants, the explicit propagation in FlinkWriteClients, the related Flink test changes, and update the
PR description accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants