Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.fluss.client.admin;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
Expand Down Expand Up @@ -796,4 +798,23 @@ CompletableFuture<RegisterResult> registerProducerOffsets(
* @since 1.0
*/
CompletableFuture<ClusterHealth> getClusterHealth();

/**
* List per-bucket remote log manifest entries for a table or partition scope.
*
* @param tableId the table to query
* @param partitionId optional partition id (null for non-partitioned tables)
* @return per-bucket manifest paths and end offsets
*/
CompletableFuture<List<RemoteLogManifestInfo>> listRemoteLogManifests(
long tableId, @Nullable Long partitionId);

/**
* List per-bucket active KV snapshot ids for a table or partition scope.
*
* @param tableId the table to query
* @param partitionId optional partition id (null for non-partitioned tables)
* @return per-bucket active snapshot ids grouped by bucket
*/
CompletableFuture<ActiveKvSnapshots> listKvSnapshots(long tableId, @Nullable Long partitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.fluss.client.admin;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
Expand Down Expand Up @@ -81,10 +83,12 @@
import org.apache.fluss.rpc.messages.ListAclsRequest;
import org.apache.fluss.rpc.messages.ListDatabasesRequest;
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListOffsetsResponse;
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest;
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.ListTablesResponse;
import org.apache.fluss.rpc.messages.PbAlterConfig;
Expand Down Expand Up @@ -392,6 +396,36 @@ public CompletableFuture<List<PartitionInfo>> listPartitionInfos(
.thenApply(ClientRpcMessageUtils::toPartitionInfos);
}

/**
* Returns per-bucket remote log manifest path for the given table or partition.
*
* <p>Used by the orphan cleanup action to construct the active manifest path set without
* relying on FS LIST + mtime selection.
*/
@Override
public CompletableFuture<List<RemoteLogManifestInfo>> listRemoteLogManifests(
long tableId, @Nullable Long partitionId) {
ListRemoteLogManifestsRequest request = new ListRemoteLogManifestsRequest();
request.setTableId(tableId);
if (partitionId != null) {
request.setPartitionId(partitionId);
}
return gateway.listRemoteLogManifests(request)
.thenApply(ClientRpcMessageUtils::toRemoteLogManifestInfos);
}

@Override
public CompletableFuture<ActiveKvSnapshots> listKvSnapshots(
long tableId, @Nullable Long partitionId) {
ListKvSnapshotsRequest request = new ListKvSnapshotsRequest();
request.setTableId(tableId);
if (partitionId != null) {
request.setPartitionId(partitionId);
}
return gateway.listKvSnapshots(request)
.thenApply(ClientRpcMessageUtils::toActiveKvSnapshots);
}

@Override
public CompletableFuture<Void> createPartition(
TablePath tablePath, PartitionSpec partitionSpec, boolean ignoreIfExists) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.metadata;

import org.apache.fluss.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* Per-bucket active KV snapshot ids for a table or partition scope. The active set is the union of
* retained snapshots and lease-pinned (still-in-use) snapshots reported by the coordinator.
*
* @since 1.0
*/
@PublicEvolving
public final class ActiveKvSnapshots {

private final long tableId;
@Nullable private final Long partitionId;
private final Map<Integer, Set<Long>> snapshotIdsByBucket;

public ActiveKvSnapshots(
long tableId, @Nullable Long partitionId, Map<Integer, Set<Long>> snapshotIdsByBucket) {
this.tableId = tableId;
this.partitionId = partitionId;
Map<Integer, Set<Long>> copy = new HashMap<>();
for (Map.Entry<Integer, Set<Long>> entry : snapshotIdsByBucket.entrySet()) {
copy.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
}
this.snapshotIdsByBucket = Collections.unmodifiableMap(copy);
}

public long getTableId() {
return tableId;
}

@Nullable
public Long getPartitionId() {
return partitionId;
}

public Map<Integer, Set<Long>> getSnapshotIdsByBucket() {
return snapshotIdsByBucket;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.metadata;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.metadata.TableBucket;

/**
* A single remote log manifest entry returned by the coordinator. Each entry maps a {@link
* TableBucket} to its current manifest file path and the end offset covered by that manifest.
*
* @since 1.0
*/
@PublicEvolving
public final class RemoteLogManifestInfo {

private final TableBucket tableBucket;
private final String remoteLogManifestPath;
private final long remoteLogEndOffset;

public RemoteLogManifestInfo(
TableBucket tableBucket, String remoteLogManifestPath, long remoteLogEndOffset) {
this.tableBucket = tableBucket;
this.remoteLogManifestPath = remoteLogManifestPath;
this.remoteLogEndOffset = remoteLogEndOffset;
}

public TableBucket getTableBucket() {
return tableBucket;
}

public String getRemoteLogManifestPath() {
return remoteLogManifestPath;
}

public long getRemoteLogEndOffset() {
return remoteLogEndOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.fluss.client.lookup.LookupBatch;
import org.apache.fluss.client.lookup.PrefixLookupBatch;
import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
import org.apache.fluss.client.write.KvWriteBatch;
import org.apache.fluss.client.write.ReadyWriteBatch;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
Expand Down Expand Up @@ -61,9 +63,11 @@
import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
import org.apache.fluss.rpc.messages.GetTableStatsRequest;
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse;
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.PbAddColumn;
Expand All @@ -87,6 +91,7 @@
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket;
import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable;
import org.apache.fluss.rpc.messages.PbRemoteLogManifestEntry;
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
import org.apache.fluss.rpc.messages.PbRenameColumn;
import org.apache.fluss.rpc.messages.PbTableBucket;
Expand All @@ -106,6 +111,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -536,6 +542,34 @@ public static ReleaseKvSnapshotLeaseRequest makeReleaseKvSnapshotLeaseRequest(
return request;
}

public static List<RemoteLogManifestInfo> toRemoteLogManifestInfos(
ListRemoteLogManifestsResponse response) {
List<RemoteLogManifestInfo> result = new ArrayList<>(response.getManifestsCount());
for (PbRemoteLogManifestEntry entry : response.getManifestsList()) {
PbTableBucket pb = entry.getTableBucket();
Long partitionId = pb.hasPartitionId() ? pb.getPartitionId() : null;
TableBucket tableBucket =
new TableBucket(pb.getTableId(), partitionId, pb.getBucketId());
result.add(
new RemoteLogManifestInfo(
tableBucket,
entry.getRemoteLogManifestPath(),
entry.getRemoteLogEndOffset()));
}
return result;
}

public static ActiveKvSnapshots toActiveKvSnapshots(ListKvSnapshotsResponse response) {
Map<Integer, Set<Long>> snapshotIdsByBucket = new HashMap<>();
for (PbKvSnapshot snapshot : response.getActiveSnapshotsList()) {
snapshotIdsByBucket
.computeIfAbsent(snapshot.getBucketId(), k -> new HashSet<>())
.add(snapshot.getSnapshotId());
}
Long partitionId = response.hasPartitionId() ? response.getPartitionId() : null;
return new ActiveKvSnapshots(response.getTableId(), partitionId, snapshotIdsByBucket);
}

public static Optional<RebalanceProgress> toRebalanceProgress(
ListRebalanceProgressResponse response) {
if (!response.hasRebalanceId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.fluss.client.admin.OffsetSpec;
import org.apache.fluss.client.admin.ProducerOffsetsResult;
import org.apache.fluss.client.admin.RegisterResult;
import org.apache.fluss.client.metadata.ActiveKvSnapshots;
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.metadata.RemoteLogManifestInfo;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
Expand Down Expand Up @@ -323,4 +325,16 @@ public CompletableFuture<LakeSnapshot> getReadableLakeSnapshot(TablePath tablePa
public CompletableFuture<ClusterHealth> getClusterHealth() {
throw new UnsupportedOperationException("Not implemented in TestAdminAdapter");
}

@Override
public CompletableFuture<List<RemoteLogManifestInfo>> listRemoteLogManifests(
long tableId, @Nullable Long partitionId) {
throw new UnsupportedOperationException("Not implemented in TestAdminAdapter");
}

@Override
public CompletableFuture<ActiveKvSnapshots> listKvSnapshots(
long tableId, @Nullable Long partitionId) {
throw new UnsupportedOperationException("Not implemented in TestAdminAdapter");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@
import org.apache.fluss.rpc.messages.DropTableResponse;
import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
import org.apache.fluss.rpc.messages.ListKvSnapshotsRequest;
import org.apache.fluss.rpc.messages.ListKvSnapshotsResponse;
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsRequest;
import org.apache.fluss.rpc.messages.ListRemoteLogManifestsResponse;
import org.apache.fluss.rpc.messages.RebalanceRequest;
import org.apache.fluss.rpc.messages.RebalanceResponse;
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
Expand Down Expand Up @@ -216,4 +220,29 @@ CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease(

// todo: rename table & alter table

// ==================================================================================
// Orphan Cleanup RPCs (coordinator-only, internal)
// ==================================================================================

/**
* List remote log manifest entries for all buckets of a table or single partition.
*
* @param request request with table_id and optional partition_id
* @return per-bucket manifest path and end offset
*/
@RPC(api = ApiKeys.LIST_REMOTE_LOG_MANIFESTS)
CompletableFuture<ListRemoteLogManifestsResponse> listRemoteLogManifests(
ListRemoteLogManifestsRequest request);

/**
* List active KV snapshot ids for a (tableId, partitionId) unit. The response is the union of
* (a) snapshots currently held by the in-memory {@code CompletedSnapshotStore} for each bucket
* and (b) snapshots still pinned by an active KV snapshot lease. No retention truncation is
* applied — every snapshot the coordinator has not yet pruned from ZK is reported as active so
* orphan cleanup never misdeletes a still-referenced snapshot. The server emits one entry per
* active {@code (bucket_id, snapshot_id)} pair with no source discriminator; callers must treat
* the entire response as the active set.
*/
@RPC(api = ApiKeys.LIST_KV_SNAPSHOTS)
CompletableFuture<ListKvSnapshotsResponse> listKvSnapshots(ListKvSnapshotsRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public enum ApiKeys {
GET_TABLE_STATS(1059, 0, 0, PUBLIC),
ALTER_DATABASE(1060, 0, 0, PUBLIC),
SCAN_KV(1061, 0, 0, PUBLIC),
GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC);
GET_CLUSTER_HEALTH(1062, 0, 0, PUBLIC),
LIST_REMOTE_LOG_MANIFESTS(1063, 0, 0, PUBLIC),
LIST_KV_SNAPSHOTS(1064, 0, 0, PUBLIC);

private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
Expand Down
Loading