diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index 5f0bcbde50..6d60e40988 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -18,9 +18,11 @@ package org.apache.fluss.client.admin; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.client.metadata.ActiveKvSnapshots; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.metadata.RemoteLogManifestInfo; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.rebalance.GoalType; import org.apache.fluss.cluster.rebalance.RebalanceProgress; @@ -796,4 +798,23 @@ CompletableFuture registerProducerOffsets( * @since 1.0 */ CompletableFuture getClusterHealth(); + + /** + * List per-bucket remote log manifest entries for a table or partition scope. + * + * @param tableId the table to query + * @param partitionId optional partition id (null for non-partitioned tables) + * @return per-bucket manifest paths and end offsets + */ + CompletableFuture> listRemoteLogManifests( + long tableId, @Nullable Long partitionId); + + /** + * List per-bucket active KV snapshot ids for a table or partition scope. + * + * @param tableId the table to query + * @param partitionId optional partition id (null for non-partitioned tables) + * @return per-bucket active snapshot ids grouped by bucket + */ + CompletableFuture listKvSnapshots(long tableId, @Nullable Long partitionId); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index f724a15b84..80f5d83e56 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -18,10 +18,12 @@ package org.apache.fluss.client.admin; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.client.metadata.ActiveKvSnapshots; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.client.metadata.RemoteLogManifestInfo; import org.apache.fluss.client.utils.ClientRpcMessageUtils; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; @@ -81,10 +83,12 @@ import org.apache.fluss.rpc.messages.ListAclsRequest; import org.apache.fluss.rpc.messages.ListDatabasesRequest; import org.apache.fluss.rpc.messages.ListDatabasesResponse; +import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest; import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListOffsetsResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.PbAlterConfig; @@ -392,6 +396,36 @@ public CompletableFuture> listPartitionInfos( .thenApply(ClientRpcMessageUtils::toPartitionInfos); } + /** + * Returns per-bucket remote log manifest path for the given table or partition. + * + *

Used by the orphan cleanup action to construct the active manifest path set without + * relying on FS LIST + mtime selection. + */ + @Override + public CompletableFuture> listRemoteLogManifests( + long tableId, @Nullable Long partitionId) { + ListRemoteLogManifestsRequest request = new ListRemoteLogManifestsRequest(); + request.setTableId(tableId); + if (partitionId != null) { + request.setPartitionId(partitionId); + } + return gateway.listRemoteLogManifests(request) + .thenApply(ClientRpcMessageUtils::toRemoteLogManifestInfos); + } + + @Override + public CompletableFuture listKvSnapshots( + long tableId, @Nullable Long partitionId) { + ListKvSnapshotsRequest request = new ListKvSnapshotsRequest(); + request.setTableId(tableId); + if (partitionId != null) { + request.setPartitionId(partitionId); + } + return gateway.listKvSnapshots(request) + .thenApply(ClientRpcMessageUtils::toActiveKvSnapshots); + } + @Override public CompletableFuture createPartition( TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ActiveKvSnapshots.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ActiveKvSnapshots.java new file mode 100644 index 0000000000..3950c69ef1 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ActiveKvSnapshots.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.metadata; + +import org.apache.fluss.annotation.PublicEvolving; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Per-bucket active KV snapshot ids for a table or partition scope. The active set is the union of + * retained snapshots and lease-pinned (still-in-use) snapshots reported by the coordinator. + * + * @since 1.0 + */ +@PublicEvolving +public final class ActiveKvSnapshots { + + private final long tableId; + @Nullable private final Long partitionId; + private final Map> snapshotIdsByBucket; + + public ActiveKvSnapshots( + long tableId, @Nullable Long partitionId, Map> snapshotIdsByBucket) { + this.tableId = tableId; + this.partitionId = partitionId; + Map> copy = new HashMap<>(); + for (Map.Entry> entry : snapshotIdsByBucket.entrySet()) { + copy.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue())); + } + this.snapshotIdsByBucket = Collections.unmodifiableMap(copy); + } + + public long getTableId() { + return tableId; + } + + @Nullable + public Long getPartitionId() { + return partitionId; + } + + public Map> getSnapshotIdsByBucket() { + return snapshotIdsByBucket; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/RemoteLogManifestInfo.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/RemoteLogManifestInfo.java new file mode 100644 index 0000000000..fa0341771b --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/RemoteLogManifestInfo.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.metadata; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +/** + * A single remote log manifest entry returned by the coordinator. Each entry maps a {@link + * TableBucket} to its current manifest file path and the end offset covered by that manifest. + * + * @since 1.0 + */ +@PublicEvolving +public final class RemoteLogManifestInfo { + + private final TableBucket tableBucket; + private final String remoteLogManifestPath; + private final long remoteLogEndOffset; + + public RemoteLogManifestInfo( + TableBucket tableBucket, String remoteLogManifestPath, long remoteLogEndOffset) { + this.tableBucket = tableBucket; + this.remoteLogManifestPath = remoteLogManifestPath; + this.remoteLogEndOffset = remoteLogEndOffset; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public String getRemoteLogManifestPath() { + return remoteLogManifestPath; + } + + public long getRemoteLogEndOffset() { + return remoteLogEndOffset; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 3f6e2443a9..2df5b91efc 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -24,9 +24,11 @@ import org.apache.fluss.client.lookup.LookupBatch; import org.apache.fluss.client.lookup.PrefixLookupBatch; import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult; +import org.apache.fluss.client.metadata.ActiveKvSnapshots; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.metadata.RemoteLogManifestInfo; import org.apache.fluss.client.write.KvWriteBatch; import org.apache.fluss.client.write.ReadyWriteBatch; import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; @@ -61,9 +63,11 @@ import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; import org.apache.fluss.rpc.messages.GetTableStatsRequest; import org.apache.fluss.rpc.messages.ListDatabasesResponse; +import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse; import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.PbAddColumn; @@ -87,6 +91,7 @@ import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket; import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket; import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable; +import org.apache.fluss.rpc.messages.PbRemoteLogManifestEntry; import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile; import org.apache.fluss.rpc.messages.PbRenameColumn; import org.apache.fluss.rpc.messages.PbTableBucket; @@ -106,6 +111,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -536,6 +542,34 @@ public static ReleaseKvSnapshotLeaseRequest makeReleaseKvSnapshotLeaseRequest( return request; } + public static List toRemoteLogManifestInfos( + ListRemoteLogManifestsResponse response) { + List result = new ArrayList<>(response.getManifestsCount()); + for (PbRemoteLogManifestEntry entry : response.getManifestsList()) { + PbTableBucket pb = entry.getTableBucket(); + Long partitionId = pb.hasPartitionId() ? pb.getPartitionId() : null; + TableBucket tableBucket = + new TableBucket(pb.getTableId(), partitionId, pb.getBucketId()); + result.add( + new RemoteLogManifestInfo( + tableBucket, + entry.getRemoteLogManifestPath(), + entry.getRemoteLogEndOffset())); + } + return result; + } + + public static ActiveKvSnapshots toActiveKvSnapshots(ListKvSnapshotsResponse response) { + Map> snapshotIdsByBucket = new HashMap<>(); + for (PbKvSnapshot snapshot : response.getActiveSnapshotsList()) { + snapshotIdsByBucket + .computeIfAbsent(snapshot.getBucketId(), k -> new HashSet<>()) + .add(snapshot.getSnapshotId()); + } + Long partitionId = response.hasPartitionId() ? response.getPartitionId() : null; + return new ActiveKvSnapshots(response.getTableId(), partitionId, snapshotIdsByBucket); + } + public static Optional toRebalanceProgress( ListRebalanceProgressResponse response) { if (!response.hasRebalanceId()) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java index 792187cc84..e8120c83d2 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java @@ -26,9 +26,11 @@ import org.apache.fluss.client.admin.OffsetSpec; import org.apache.fluss.client.admin.ProducerOffsetsResult; import org.apache.fluss.client.admin.RegisterResult; +import org.apache.fluss.client.metadata.ActiveKvSnapshots; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.metadata.RemoteLogManifestInfo; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.rebalance.GoalType; import org.apache.fluss.cluster.rebalance.RebalanceProgress; @@ -323,4 +325,16 @@ public CompletableFuture getReadableLakeSnapshot(TablePath tablePa public CompletableFuture getClusterHealth() { throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); } + + @Override + public CompletableFuture> listRemoteLogManifests( + long tableId, @Nullable Long partitionId) { + throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); + } + + @Override + public CompletableFuture listKvSnapshots( + long tableId, @Nullable Long partitionId) { + throw new UnsupportedOperationException("Not implemented in TestAdminAdapter"); + } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index 21d286aa49..44c362a54d 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -51,8 +51,12 @@ import org.apache.fluss.rpc.messages.DropTableResponse; import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; +import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest; +import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; @@ -216,4 +220,29 @@ CompletableFuture dropKvSnapshotLease( // todo: rename table & alter table + // ================================================================================== + // Orphan Cleanup RPCs (coordinator-only, internal) + // ================================================================================== + + /** + * List remote log manifest entries for all buckets of a table or single partition. + * + * @param request request with table_id and optional partition_id + * @return per-bucket manifest path and end offset + */ + @RPC(api = ApiKeys.LIST_REMOTE_LOG_MANIFESTS) + CompletableFuture listRemoteLogManifests( + ListRemoteLogManifestsRequest request); + + /** + * List active KV snapshot ids for a (tableId, partitionId) unit. The response is the union of + * (a) snapshots currently held by the in-memory {@code CompletedSnapshotStore} for each bucket + * and (b) snapshots still pinned by an active KV snapshot lease. No retention truncation is + * applied — every snapshot the coordinator has not yet pruned from ZK is reported as active so + * orphan cleanup never misdeletes a still-referenced snapshot. The server emits one entry per + * active {@code (bucket_id, snapshot_id)} pair with no source discriminator; callers must treat + * the entire response as the active set. + */ + @RPC(api = ApiKeys.LIST_KV_SNAPSHOTS) + CompletableFuture listKvSnapshots(ListKvSnapshotsRequest request); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index dd164d7cc8..b03e1ec63a 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -104,7 +104,9 @@ public enum ApiKeys { GET_TABLE_STATS(1059, 0, 0, PUBLIC), ALTER_DATABASE(1060, 0, 0, PUBLIC), SCAN_KV(1061, 0, 0, PUBLIC), - GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC); + GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC), + LIST_REMOTE_LOG_MANIFESTS(1063, 0, 0, PUBLIC), + LIST_KV_SNAPSHOTS(1064, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 1e66bc343a..41c738777a 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -506,6 +506,36 @@ message ListPartitionInfosResponse { repeated PbPartitionInfo partitions_info = 1; } +// list remote log manifest entries (one per bucket of a table or partition) +message ListRemoteLogManifestsRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; // required if table is partitioned +} + +message ListRemoteLogManifestsResponse { + repeated PbRemoteLogManifestEntry manifests = 1; +} + +message PbRemoteLogManifestEntry { + required PbTableBucket table_bucket = 1; + required string remote_log_manifest_path = 2; + required int64 remote_log_end_offset = 3; +} + +// list active KV snapshot dirs (retained_N + still-in-use) for a unit +message ListKvSnapshotsRequest { + required int64 table_id = 1; + optional int64 partition_id = 2; +} + +message ListKvSnapshotsResponse { + required int64 table_id = 1; + // null if it is a non-partitioned table, otherwise, it must be not null + optional int64 partition_id = 2; + // Active snapshots = retained_N ∪ still-in-use; multiple entries per bucket allowed. + repeated PbKvSnapshot active_snapshots = 3; +} + // create partition request and response message CreatePartitionRequest { required PbTablePath table_path = 1; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java index 6e483853c9..f7c1b9bf6b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.coordinator; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.exception.ApiException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.MetricNames; @@ -35,9 +36,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -246,6 +251,54 @@ private CompletedSnapshotStore createCompletedSnapshotStore( snapshotInUseChecker); } + /** + * Returns active snapshot IDs per bucket for the given (tableId, partitionId) scope. For + * buckets with an in-memory {@link CompletedSnapshotStore}, the cached active set is returned + * (completed snapshots ∪ still-in-use snapshots, no retention truncation). For other buckets, + * snapshot IDs are read directly from ZK children (no per-snapshot payload fetch). + */ + public Map> getActiveSnapshotIdsByBucket( + long tableId, @Nullable Long partitionId, int numBuckets) { + Map> result = new HashMap<>(); + for (int i = 0; i < numBuckets; i++) { + TableBucket tb = new TableBucket(tableId, partitionId, i); + CompletedSnapshotStore store = bucketCompletedSnapshotStores.get(tb); + Set ids; + if (store != null) { + ids = store.getActiveSnapshotIds(); + } else { + ids = readActiveSnapshotIdsFromZk(tb); + } + if (!ids.isEmpty()) { + result.put(i, ids); + } + } + return result; + } + + private Set readActiveSnapshotIdsFromZk(TableBucket tableBucket) { + List snapshotIds; + try { + snapshotIds = zooKeeperClient.listBucketSnapshotIds(tableBucket); + } catch (Exception e) { + throw new ApiException( + "Failed to read snapshot IDs from ZK for " + + tableBucket + + ": " + + e.getMessage(), + e); + } + if (snapshotIds.isEmpty()) { + return Collections.emptySet(); + } + // Treat every snapshot still referenced in ZK as active. As long as the + // coordinator has not pruned a snapshot's ZK handle, it is potentially in use + // (lease, in-flight RPC, recovery, etc.), so the orphan cleaner must keep its + // files. Orphan cleanup is conservative by design — a slightly broader active + // set is harmless, while excluding a still-referenced snapshot is not. + return new HashSet<>(snapshotIds); + } + @VisibleForTesting Map getBucketCompletedSnapshotStores() { return bucketCompletedSnapshotStores; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index c2e297f19c..66dee8051c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -2333,7 +2333,6 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List coordinatorEpochSupplier; private final CoordinatorMetadataCache metadataCache; + private final Supplier snapshotStoreManagerSupplier; private final LakeTableTieringManager lakeTableTieringManager; private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader; private final ExecutorService ioExecutor; @@ -278,6 +289,8 @@ public CoordinatorService( () -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager(); this.coordinatorEpochSupplier = () -> coordinatorEventProcessorSupplier.get().getCoordinatorEpoch(); + this.snapshotStoreManagerSupplier = + () -> coordinatorEventProcessorSupplier.get().completedSnapshotStoreManager(); this.lakeTableTieringManager = lakeTableTieringManager; this.metadataCache = metadataCache; this.lakeCatalogDynamicLoader = lakeCatalogDynamicLoader; @@ -785,6 +798,162 @@ public CompletableFuture metadata(MetadataRequest request) { return metadataResponseAccessContextEvent.getResultFuture(); } + @Override + public CompletableFuture listRemoteLogManifests( + ListRemoteLogManifestsRequest request) { + long tableId = request.getTableId(); + if (authorizer != null) { + authorizeTableWithSession(currentSession(), OperationType.DESCRIBE, tableId); + } + return CompletableFuture.supplyAsync( + () -> { + Long partitionId = request.hasPartitionId() ? request.getPartitionId() : null; + validatePartitionOwnership(partitionId, tableId); + try { + List entries = + zkClient.listRemoteLogManifestHandles(tableId, partitionId); + return makeListRemoteLogManifestsResponse(entries); + } catch (ApiException e) { + throw e; + } catch (Exception e) { + throw new UnknownServerException( + "Failed to list remote log manifests for tableId=" + tableId, e); + } + }, + ioExecutor); + } + + @Override + public CompletableFuture listKvSnapshots( + ListKvSnapshotsRequest request) { + long tableId = request.getTableId(); + Long partitionId = request.hasPartitionId() ? request.getPartitionId() : null; + if (authorizer != null) { + authorizeTableWithSession(currentSession(), OperationType.DESCRIBE, tableId); + } + + // Resolve numBuckets via event thread (CoordinatorContext is @NotThreadSafe) + CompletableFuture numBucketsFuture = resolveNumBuckets(tableId, partitionId); + + return numBucketsFuture.thenCompose( + numBuckets -> + CompletableFuture.supplyAsync( + () -> { + validatePartitionOwnership(partitionId, tableId); + + CompletedSnapshotStoreManager storeManager = + snapshotStoreManagerSupplier.get(); + Map> activeByBucket = + storeManager.getActiveSnapshotIdsByBucket( + tableId, partitionId, numBuckets); + Map> stillInUse = + kvSnapshotLeaseManager.getStillInUseSnapshotIds( + tableId, partitionId); + + ListKvSnapshotsResponse response = + new ListKvSnapshotsResponse().setTableId(tableId); + if (partitionId != null) { + response.setPartitionId(partitionId); + } + + Set allBucketIds = + new HashSet<>(activeByBucket.keySet()); + allBucketIds.addAll(stillInUse.keySet()); + for (int bucketId : allBucketIds) { + Set merged = + new LinkedHashSet<>( + activeByBucket.getOrDefault( + bucketId, Collections.emptySet())); + merged.addAll( + stillInUse.getOrDefault( + bucketId, Collections.emptySet())); + for (Long snapId : merged) { + response.addActiveSnapshot() + .setBucketId(bucketId) + .setSnapshotId(snapId); + } + } + return response; + }, + ioExecutor)); + } + + private CompletableFuture resolveNumBuckets(long tableId, @Nullable Long partitionId) { + AccessContextEvent event = + new AccessContextEvent<>( + ctx -> { + TablePath tablePath = ctx.getTablePathById(tableId); + if (tablePath != null) { + TableInfo tableInfo = ctx.getTableInfoById(tableId); + if (tableInfo != null) { + return tableInfo.getNumBuckets(); + } + } + return null; + }); + eventManagerSupplier.get().put(event); + return event.getResultFuture() + .thenCompose( + numBuckets -> { + if (numBuckets != null) { + return CompletableFuture.completedFuture(numBuckets); + } + return CompletableFuture.supplyAsync( + () -> resolveNumBucketsFromZk(tableId, partitionId), + ioExecutor); + }); + } + + private int resolveNumBucketsFromZk(long tableId, @Nullable Long partitionId) { + try { + if (partitionId != null) { + Optional pAssignment = + zkClient.getPartitionAssignment(partitionId); + if (!pAssignment.isPresent()) { + throw new PartitionNotExistException( + "Partition " + partitionId + " does not exist"); + } + return pAssignment.get().getBuckets().size(); + } else { + Optional assignment = zkClient.getTableAssignment(tableId); + if (!assignment.isPresent()) { + throw new TableNotExistException("Table " + tableId + " does not exist"); + } + return assignment.get().getBuckets().size(); + } + } catch (ApiException e) { + throw e; + } catch (Exception e) { + throw new ApiException("Failed to resolve table " + tableId + ": " + e.getMessage(), e); + } + } + + private void validatePartitionOwnership(@Nullable Long partitionId, long tableId) { + if (partitionId == null) { + return; + } + try { + Optional pa = zkClient.getPartitionAssignment(partitionId); + if (!pa.isPresent()) { + throw new PartitionNotExistException( + "Partition " + partitionId + " does not exist"); + } + if (pa.get().getTableId() != tableId) { + throw new PartitionNotExistException( + "Partition " + partitionId + " does not belong to table " + tableId); + } + } catch (ApiException e) { + throw e; + } catch (Exception e) { + throw new UnknownServerException( + "Failed to validate partition ownership for partition " + + partitionId + + " and table " + + tableId, + e); + } + } + @Override public CompletableFuture adjustIsr(AdjustIsrRequest request) { CompletableFuture response = new CompletableFuture<>(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java index 97ec1dd1a0..6c79a668e8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/lease/KvSnapshotLeaseManager.java @@ -32,14 +32,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -311,6 +314,56 @@ public boolean dropLease(String leaseId) throws Exception { }); } + /** + * Returns lease-pinned snapshot ids per bucket for the given (tableId, partitionId) unit. Used + * by the {@code ListKvSnapshots} RPC to expose STILL_IN_USE snapshots that fall outside the + * retained_N window of {@link + * org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore#stillInUseSnapshots}. + * + *

Result map keys are bucket ids; values are the set of snapshot ids that the lease layer + * currently considers pinned for that bucket. + */ + public Map> getStillInUseSnapshotIds( + long tableId, @Nullable Long partitionId) { + return inReadLock( + managerLock, + () -> { + Map> result = new HashMap<>(); + for (KvSnapshotLeaseHandler leaseHandler : kvSnapshotLeaseMap.values()) { + KvSnapshotTableLease tableLease = + leaseHandler.getTableIdToTableLease().get(tableId); + if (tableLease == null) { + continue; + } + Long[] snapshots; + if (partitionId == null) { + snapshots = tableLease.getBucketSnapshots(); + } else { + Map byPartition = tableLease.getPartitionSnapshots(); + if (byPartition == null) { + continue; + } + snapshots = byPartition.get(partitionId); + } + if (snapshots == null) { + continue; + } + // Invariant: `snapshots` is indexed by bucketId starting at 0, contiguous, + // with length >= bucket count (auto-expanded by KvSnapshotLeaseHandler when + // buckets are added). Slots not yet registered hold -1L (or null after + // certain partitioned-table paths). See KvSnapshotLeaseHandler#addBucket + // (bucketSnapshot[bucketId] = snapshotId) for the writer-side enforcement. + for (int bucketId = 0; bucketId < snapshots.length; bucketId++) { + Long snapId = snapshots[bucketId]; + if (snapId != null && snapId != -1L) { + result.computeIfAbsent(bucketId, k -> new HashSet<>()).add(snapId); + } + } + } + return result; + }); + } + private void initializeRefCount(KvSnapshotLeaseHandler lease) { for (Map.Entry tableEntry : lease.getTableIdToTableLease().entrySet()) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java index cf398636b8..7cf3e88db6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java @@ -124,6 +124,27 @@ public long getNumSnapshots() { return inLock(lock, completedSnapshots::size); } + /** + * Returns the union of every snapshot still held by this store and the snapshots recorded as + * still-in-use. Any snapshot whose ZK handle has not yet been pruned is potentially active + * (lease, in-flight RPC, recovery, etc.), so the orphan cleaner must keep its files. Orphan + * cleanup is conservative by design — a slightly broader active set is harmless, while + * excluding a still-referenced snapshot is not. + */ + public Set getActiveSnapshotIds() { + return inLock( + lock, + () -> { + Set ids = + new HashSet<>(completedSnapshots.size() + stillInUseSnapshots.size()); + for (CompletedSnapshot snapshot : completedSnapshots) { + ids.add(snapshot.getSnapshotID()); + } + ids.addAll(stillInUseSnapshots.keySet()); + return ids; + }); + } + /** * Synchronously writes the new snapshots to snapshot handle store and asynchronously removes * older ones. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..266abe3bce 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -87,6 +87,7 @@ import org.apache.fluss.rpc.messages.ListOffsetsResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse; import org.apache.fluss.rpc.messages.LookupRequest; import org.apache.fluss.rpc.messages.LookupResponse; import org.apache.fluss.rpc.messages.MetadataResponse; @@ -141,6 +142,7 @@ import org.apache.fluss.rpc.messages.PbPutKvRespForBucket; import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket; import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket; +import org.apache.fluss.rpc.messages.PbRemoteLogManifestEntry; import org.apache.fluss.rpc.messages.PbRemoteLogSegment; import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile; import org.apache.fluss.rpc.messages.PbRenameColumn; @@ -192,6 +194,7 @@ import org.apache.fluss.server.metadata.PartitionMetadata; import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metadata.TableMetadata; +import org.apache.fluss.server.zk.ZooKeeperClient.TableBucketAndManifest; import org.apache.fluss.server.zk.data.BucketSnapshot; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionRegistration; @@ -2252,4 +2255,22 @@ public static Map toTableBucketOffsets( } return offsets; } + + public static ListRemoteLogManifestsResponse makeListRemoteLogManifestsResponse( + List remoteLogManifestInfos) { + ListRemoteLogManifestsResponse response = new ListRemoteLogManifestsResponse(); + for (TableBucketAndManifest entry : remoteLogManifestInfos) { + PbRemoteLogManifestEntry pb = response.addManifest(); + PbTableBucket pbTb = pb.setTableBucket(); + pbTb.setTableId(entry.getTableBucket().getTableId()); + if (entry.getTableBucket().getPartitionId() != null) { + pbTb.setPartitionId(entry.getTableBucket().getPartitionId()); + } + pbTb.setBucketId(entry.getTableBucket().getBucket()); + pb.setRemoteLogManifestPath( + entry.getManifestHandle().getRemoteLogManifestPath().toString()); + pb.setRemoteLogEndOffset(entry.getManifestHandle().getRemoteLogEndOffset()); + } + return response; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index cb34a2f2ed..533cca6ff4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1341,6 +1341,71 @@ public Optional getRemoteLogManifestHandle(TableBucket return getOrEmpty(path).map(BucketRemoteLogsZNode::decode); } + /** + * Lists all remote log manifest handles for buckets of the given table (or partition when + * {@code partitionId != null}). Reads the {@code BucketRemoteLogsZNode} subtree under either + * {@code /tabletservers/tables/{tableId}/buckets/} or {@code + * /tabletservers/partitions/{partitionId}/buckets/} as a single ZK getChildren call followed by + * per-bucket getData. Returns each bucket's currently-published manifest path from coordinator + * metadata; callers must read the manifest file separately. + */ + public List listRemoteLogManifestHandles( + long tableId, @Nullable Long partitionId) throws Exception { + String bucketsPath = + partitionId == null + ? TableIdZNode.path(tableId) + "/buckets" + : PartitionIdZNode.path(partitionId) + "/buckets"; + List bucketIdStrs = getChildren(bucketsPath); + List result = new ArrayList<>(bucketIdStrs.size()); + for (String bucketIdStr : bucketIdStrs) { + int bucketId = Integer.parseInt(bucketIdStr); + TableBucket tb = + partitionId == null + ? new TableBucket(tableId, bucketId) + : new TableBucket(tableId, partitionId, bucketId); + Optional handle = getRemoteLogManifestHandle(tb); + handle.ifPresent(h -> result.add(new TableBucketAndManifest(tb, h))); + } + return result; + } + + /** Tuple of a table bucket and its current remote log manifest handle. */ + public static final class TableBucketAndManifest { + private final TableBucket tableBucket; + private final RemoteLogManifestHandle manifestHandle; + + public TableBucketAndManifest( + TableBucket tableBucket, RemoteLogManifestHandle manifestHandle) { + this.tableBucket = tableBucket; + this.manifestHandle = manifestHandle; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public RemoteLogManifestHandle getManifestHandle() { + return manifestHandle; + } + } + + /** + * Lists the snapshot ids of all completed bucket snapshots for the given {@link TableBucket}. + * Reads only the {@code BucketSnapshotsZNode} children — the per-snapshot payload is not + * fetched, since callers (e.g. orphan-files cleanup) only need the id set to identify which + * snapshot directories must be retained. Returned ids are ordered ascending. + */ + public List listBucketSnapshotIds(TableBucket tableBucket) throws Exception { + String path = BucketSnapshotsZNode.path(tableBucket); + List snapshotIdStrs = getChildren(path); + List ids = new ArrayList<>(snapshotIdStrs.size()); + for (String snapshotIdStr : snapshotIdStrs) { + ids.add(Long.parseLong(snapshotIdStr)); + } + Collections.sort(ids); + return ids; + } + /** Upsert the {@link LakeTable} to Zk Node. */ public void upsertLakeTable(long tableId, LakeTable lakeTable, boolean isUpdate) throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServiceOrphanRpcsITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServiceOrphanRpcsITCase.java new file mode 100644 index 0000000000..d5bb630155 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServiceOrphanRpcsITCase.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest; +import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse; +import org.apache.fluss.rpc.messages.PbKvSnapshot; +import org.apache.fluss.rpc.messages.PbRemoteLogManifestEntry; +import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; +import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle; +import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore; +import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore; +import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle; +import org.apache.fluss.server.kv.snapshot.SharedKvFileRegistry; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.server.testutils.RpcMessageTestUtils; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.apache.fluss.shaded.guava32.com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for orphan-cleanup-related coordinator RPCs: ListRemoteLogManifests, ListKvSnapshots. */ +class CoordinatorServiceOrphanRpcsITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder().setNumOfTabletServers(1).build(); + + private static final TablePath PK_TABLE_PATH = TablePath.of("orphan_rpc_test_db", "pk_table"); + private static final TableDescriptor PK_TABLE_DESCRIPTOR = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .withComment("pk col") + .column("val", DataTypes.STRING()) + .withComment("value col") + .primaryKey("id") + .build()) + .distributedBy(3, "id") + .build(); + + private static final int MAX_SNAPSHOTS_TO_RETAIN = 2; + + private static long pkTableId; + @TempDir static Path tempDir; + + @BeforeAll + static void setupTables() throws Exception { + pkTableId = + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, PK_TABLE_PATH, PK_TABLE_DESCRIPTOR); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(pkTableId); + } + + // ------------------------------------------------------------------------- + // ListRemoteLogManifests + // ------------------------------------------------------------------------- + + @Test + void listManifestsForUnpartitionedTable() throws Exception { + long tableId = 999_111L; + ZooKeeperClient zk = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + zk.upsertRemoteLogManifestHandle( + new TableBucket(tableId, 0), + new RemoteLogManifestHandle(new FsPath("oss://bucket-0/m.manifest"), 100L)); + zk.upsertRemoteLogManifestHandle( + new TableBucket(tableId, 1), + new RemoteLogManifestHandle(new FsPath("oss://bucket-1/m.manifest"), 200L)); + + CoordinatorGateway gateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + ListRemoteLogManifestsRequest request = new ListRemoteLogManifestsRequest(); + request.setTableId(tableId); + ListRemoteLogManifestsResponse response = gateway.listRemoteLogManifests(request).get(); + + assertThat(response.getManifestsList()).hasSize(2); + assertThat(response.getManifestsList()) + .extracting(PbRemoteLogManifestEntry::getRemoteLogEndOffset) + .containsExactlyInAnyOrder(100L, 200L); + } + + @Test + void listManifestsForUnknownTableReturnsEmpty() throws Exception { + CoordinatorGateway gateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + ListRemoteLogManifestsRequest request = new ListRemoteLogManifestsRequest(); + request.setTableId(999_999L); + ListRemoteLogManifestsResponse response = gateway.listRemoteLogManifests(request).get(); + + assertThat(response.getManifestsList()).isEmpty(); + } + + // ------------------------------------------------------------------------- + // ListKvSnapshots + // ------------------------------------------------------------------------- + + @Test + void listSnapshotsReturnsRetainedSubset() throws Exception { + TableBucket tb = new TableBucket(pkTableId, 0); + CompletedSnapshotStore store = createTestStore(snapshot -> false); + for (long snapId = 1L; snapId <= 4L; snapId++) { + store.add(createMinimalSnapshot(tb, snapId)); + } + injectStore(tb, store); + + CoordinatorGateway gateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + ListKvSnapshotsRequest request = new ListKvSnapshotsRequest(); + request.setTableId(pkTableId); + ListKvSnapshotsResponse response = gateway.listKvSnapshots(request).get(); + + assertThat(response.getTableId()).isEqualTo(pkTableId); + assertThat(response.hasPartitionId()).isFalse(); + List bucket0 = + response.getActiveSnapshotsList().stream() + .filter(s -> s.getBucketId() == 0) + .collect(Collectors.toList()); + assertThat(bucket0) + .extracting(PbKvSnapshot::getSnapshotId) + .containsExactlyInAnyOrder(3L, 4L); + } + + @Test + void listSnapshotsReturnsStillInUseBeyondRetention() throws Exception { + TableBucket tb = new TableBucket(pkTableId, 1); + CompletedSnapshotStore store = createTestStore(snapshot -> snapshot.getSnapshotID() == 1L); + for (long snapId = 1L; snapId <= 4L; snapId++) { + store.add(createMinimalSnapshot(tb, snapId)); + } + injectStore(tb, store); + + CoordinatorGateway gateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + ListKvSnapshotsRequest request = new ListKvSnapshotsRequest(); + request.setTableId(pkTableId); + ListKvSnapshotsResponse response = gateway.listKvSnapshots(request).get(); + + List bucket1Snapshots = + response.getActiveSnapshotsList().stream() + .filter(s -> s.getBucketId() == 1) + .collect(Collectors.toList()); + // RETAINED = {3, 4}, STILL_IN_USE = {1} + assertThat(bucket1Snapshots) + .extracting(PbKvSnapshot::getSnapshotId) + .containsExactlyInAnyOrder(1L, 3L, 4L); + } + + @Test + void listSnapshotsForUnknownTableFails() { + CoordinatorGateway gateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + ListKvSnapshotsRequest request = new ListKvSnapshotsRequest(); + request.setTableId(999_888L); + + assertThatThrownBy(() -> gateway.listKvSnapshots(request).get()) + .isInstanceOf(ExecutionException.class); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + private CompletedSnapshotStore createTestStore( + CompletedSnapshotStore.SnapshotInUseChecker inUseChecker) { + return new CompletedSnapshotStore( + MAX_SNAPSHOTS_TO_RETAIN, + new SharedKvFileRegistry(), + Collections.emptyList(), + new NoOpSnapshotHandleStore(), + directExecutor(), + inUseChecker); + } + + private void injectStore(TableBucket tb, CompletedSnapshotStore store) { + CompletedSnapshotStoreManager storeManager = + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorEventProcessor() + .completedSnapshotStoreManager(); + storeManager.getBucketCompletedSnapshotStores().put(tb, store); + } + + private CompletedSnapshot createMinimalSnapshot(TableBucket tb, long snapshotId) { + Path snapDir = tempDir.resolve("snap-" + tb.getBucket() + "-" + snapshotId); + snapDir.toFile().mkdirs(); + return new CompletedSnapshot( + tb, + snapshotId, + new FsPath(snapDir.resolve("_metadata").toUri().toString()), + new KvSnapshotHandle(Collections.emptyList(), Collections.emptyList(), 0L), + snapshotId, + null, + null); + } + + /** No-op implementation that avoids ZK writes during test snapshot population. */ + private static class NoOpSnapshotHandleStore implements CompletedSnapshotHandleStore { + + @Override + public void add( + TableBucket tableBucket, + long snapshotId, + CompletedSnapshotHandle completedSnapshotHandle) {} + + @Override + public void remove(TableBucket tableBucket, long snapshotId) {} + + @Override + public Optional get(TableBucket tableBucket, long snapshotId) { + return Optional.empty(); + } + + @Override + public List getAllCompletedSnapshotHandles( + TableBucket tableBucket) { + return Collections.emptyList(); + } + + @Override + public Optional getLatestCompletedSnapshotHandle( + TableBucket tableBucket) { + return Optional.empty(); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 8530ad4e7a..7f3bc32e8c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -94,10 +94,14 @@ import org.apache.fluss.rpc.messages.ListAclsResponse; import org.apache.fluss.rpc.messages.ListDatabasesRequest; import org.apache.fluss.rpc.messages.ListDatabasesResponse; +import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest; +import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest; +import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -267,6 +271,18 @@ public CompletableFuture listPartitionInfos( return null; } + @Override + public CompletableFuture listRemoteLogManifests( + ListRemoteLogManifestsRequest request) { + return null; + } + + @Override + public CompletableFuture listKvSnapshots( + ListKvSnapshotsRequest request) { + return null; + } + @Override public CompletableFuture getLakeSnapshot( GetLakeSnapshotRequest request) {