Skip to content

Commit 69b0a18

Browse files
author
bosiew.tian
committed
[flink] Support parallelism snapshot expire
1 parent 395c706 commit 69b0a18

File tree

20 files changed

+2308
-359
lines changed

20 files changed

+2308
-359
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,29 @@ public void cleanEmptyDirectories() {
132132
if (!cleanEmptyDirectories || deletionBuckets.isEmpty()) {
133133
return;
134134
}
135+
doCleanEmptyDirectories(deletionBuckets);
136+
deletionBuckets.clear();
137+
}
135138

139+
/**
140+
* Try to delete data directories that may be empty after data file deletion.
141+
*
142+
* <p>This method uses externally provided buckets instead of internal deletionBuckets. Used in
143+
* parallel expire mode where buckets are aggregated from multiple workers.
144+
*
145+
* @param aggregatedBuckets merged deletion buckets from all workers
146+
*/
147+
public void cleanEmptyDirectories(Map<BinaryRow, Set<Integer>> aggregatedBuckets) {
148+
if (!cleanEmptyDirectories || aggregatedBuckets.isEmpty()) {
149+
return;
150+
}
151+
doCleanEmptyDirectories(aggregatedBuckets);
152+
}
153+
154+
private void doCleanEmptyDirectories(Map<BinaryRow, Set<Integer>> buckets) {
136155
// All directory paths are deduplicated and sorted by hierarchy level
137156
Map<Integer, Set<Path>> deduplicate = new HashMap<>();
138-
for (Map.Entry<BinaryRow, Set<Integer>> entry : deletionBuckets.entrySet()) {
157+
for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
139158
List<Path> toDeleteEmptyDirectory = new ArrayList<>();
140159
// try to delete bucket directories
141160
for (Integer bucket : entry.getValue()) {
@@ -162,8 +181,6 @@ public void cleanEmptyDirectories() {
162181
for (int hierarchy = deduplicate.size() - 1; hierarchy >= 0; hierarchy--) {
163182
deduplicate.get(hierarchy).forEach(this::tryDeleteEmptyDirectory);
164183
}
165-
166-
deletionBuckets.clear();
167184
}
168185

169186
protected void recordDeletionBuckets(ExpireFileEntry entry) {
@@ -172,6 +189,23 @@ protected void recordDeletionBuckets(ExpireFileEntry entry) {
172189
.add(entry.bucket());
173190
}
174191

192+
/**
193+
* Get and clear the deletion buckets.
194+
*
195+
* <p>This method is used in parallel expire to collect buckets for a single task without
196+
* accumulating results from previous tasks processed by the same worker.
197+
*
198+
* @return a copy of the deletion buckets, the internal state is cleared after this call
199+
*/
200+
public Map<BinaryRow, Set<Integer>> drainDeletionBuckets() {
201+
Map<BinaryRow, Set<Integer>> result = new HashMap<>();
202+
for (Map.Entry<BinaryRow, Set<Integer>> entry : deletionBuckets.entrySet()) {
203+
result.put(entry.getKey(), new HashSet<>(entry.getValue()));
204+
}
205+
deletionBuckets.clear();
206+
return result;
207+
}
208+
175209
public void cleanUnusedDataFiles(String manifestList, Predicate<ExpireFileEntry> skipper) {
176210
// try read manifests
177211
List<ManifestFileMeta> manifests = tryReadManifestList(manifestList);
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.expire;
20+
21+
import org.apache.paimon.data.BinaryRow;
22+
23+
import java.io.Serializable;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
/** Report of a single snapshot expiration task. */
29+
public class DeletionReport implements Serializable {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
private final long snapshotId;
34+
35+
/** Whether this task was skipped (e.g., snapshot already deleted). */
36+
private boolean skipped;
37+
38+
/** Whether data files were deleted. */
39+
private boolean dataFilesDeleted;
40+
41+
/** Whether changelog data files were deleted. */
42+
private boolean changelogDeleted;
43+
44+
/** Whether manifest files were deleted. */
45+
private boolean manifestsDeleted;
46+
47+
/** Whether snapshot metadata file was deleted. */
48+
private boolean snapshotDeleted;
49+
50+
/** Buckets that had files deleted (for empty directory cleanup in parallel phase). */
51+
private Map<BinaryRow, Set<Integer>> deletionBuckets;
52+
53+
public DeletionReport(long snapshotId) {
54+
this.snapshotId = snapshotId;
55+
this.skipped = false;
56+
this.dataFilesDeleted = false;
57+
this.changelogDeleted = false;
58+
this.manifestsDeleted = false;
59+
this.snapshotDeleted = false;
60+
this.deletionBuckets = new HashMap<>();
61+
}
62+
63+
/**
64+
* Create a skipped report for a snapshot that was already deleted.
65+
*
66+
* @param snapshotId the snapshot ID
67+
* @return a skipped deletion report
68+
*/
69+
public static DeletionReport skipped(long snapshotId) {
70+
DeletionReport report = new DeletionReport(snapshotId);
71+
report.skipped = true;
72+
return report;
73+
}
74+
75+
public long snapshotId() {
76+
return snapshotId;
77+
}
78+
79+
public boolean isSkipped() {
80+
return skipped;
81+
}
82+
83+
public void setDataFilesDeleted(boolean dataFilesDeleted) {
84+
this.dataFilesDeleted = dataFilesDeleted;
85+
}
86+
87+
public void setChangelogDeleted(boolean changelogDeleted) {
88+
this.changelogDeleted = changelogDeleted;
89+
}
90+
91+
public void setManifestsDeleted(boolean manifestsDeleted) {
92+
this.manifestsDeleted = manifestsDeleted;
93+
}
94+
95+
public void setSnapshotDeleted(boolean snapshotDeleted) {
96+
this.snapshotDeleted = snapshotDeleted;
97+
}
98+
99+
public void setDeletionBuckets(Map<BinaryRow, Set<Integer>> deletionBuckets) {
100+
this.deletionBuckets = deletionBuckets;
101+
}
102+
103+
public Map<BinaryRow, Set<Integer>> deletionBuckets() {
104+
return deletionBuckets;
105+
}
106+
107+
@Override
108+
public String toString() {
109+
return "DeletionReport{"
110+
+ "snapshotId="
111+
+ snapshotId
112+
+ ", skipped="
113+
+ skipped
114+
+ ", dataFilesDeleted="
115+
+ dataFilesDeleted
116+
+ ", changelogDeleted="
117+
+ changelogDeleted
118+
+ ", manifestsDeleted="
119+
+ manifestsDeleted
120+
+ ", snapshotDeleted="
121+
+ snapshotDeleted
122+
+ ", deletionBucketsCount="
123+
+ deletionBuckets.size()
124+
+ '}';
125+
}
126+
}

0 commit comments

Comments
 (0)