Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ public String currentListenerName() {

/** Shutdown the gateway service, release any resources. */
public abstract void shutdown();

/**
* Tries to complete all pending delayed actions. Default no-op for services without delayed
* action queue.
*/
public void tryCompleteActions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public void processRequest(FlussRequest request) {
} catch (Throwable t) {
LOG.debug("Error while executing RPC {}", api, t);
request.fail(stripException(t, InvocationTargetException.class));
} finally {
service.tryCompleteActions();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
import org.apache.fluss.server.metrics.group.TableMetricGroup;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.replica.delay.ActionQueue;
import org.apache.fluss.server.replica.delay.DelayedActionQueue;
import org.apache.fluss.server.replica.delay.DelayedFetchLog;
import org.apache.fluss.server.replica.delay.DelayedFetchLog.FetchBucketStatus;
import org.apache.fluss.server.replica.delay.DelayedOperationManager;
Expand Down Expand Up @@ -187,6 +189,8 @@ public class ReplicaManager implements ServerReconfigurable {
*/
private final DelayedOperationManager<DelayedFetchLog> delayedFetchLogManager;

private final ActionQueue actionQueue;

private final ReplicaFetcherManager replicaFetcherManager;
// The manager used to manager the replica alter, especially the isr expand and shrink.
private final AdjustIsrManager adjustIsrManager;
Expand Down Expand Up @@ -309,6 +313,7 @@ public ReplicaManager(
"delay fetch log",
serverId,
conf.getInt(ConfigOptions.LOG_REPLICA_FETCH_OPERATION_PURGE_NUMBER));
this.actionQueue = new DelayedActionQueue();
this.internalListenerName = conf.get(ConfigOptions.INTERNAL_LISTENER_NAME);

this.replicaFetcherManager =
Expand Down Expand Up @@ -636,6 +641,10 @@ public void appendRecordsToLog(
appendToLocalLog(entriesPerBucket, requiredAcks, userContext);
LOG.debug("Append records to local log in {} ms", System.currentTimeMillis() - startTime);

// Enqueue delayed fetch completions — not executed here.
// Framework layer invokes tryCompleteActions() after this method returns.
addCompletePurgatoryAction(appendResult);

// maybe do delay write operation.
maybeAddDelayedWrite(
timeoutMs, requiredAcks, entriesPerBucket.size(), appendResult, responseCallback);
Expand Down Expand Up @@ -1783,6 +1792,32 @@ private boolean isNonCriticalFetchError(Errors error) {
|| error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION;
}

/**
* Adds actions to complete delayed fetch log operations for successfully written buckets.
*
* <p>Actions are added to the {@link ActionQueue} rather than executed immediately. The
* framework layer is responsible for invoking {@link #tryCompleteActions()} to execute the
* queued actions after the write path is fully finished.
*/
private void addCompletePurgatoryAction(
Map<TableBucket, ? extends WriteResultForBucket> writeResults) {
actionQueue.add(
() -> {
for (Map.Entry<TableBucket, ? extends WriteResultForBucket> entry :
writeResults.entrySet()) {
if (entry.getValue().succeeded()) {
delayedFetchLogManager.checkAndComplete(
new DelayedTableBucketKey(entry.getKey()));
}
}
});
}

/** Tries to complete all pending delayed actions in the action queue. */
public void tryCompleteActions() {
actionQueue.tryCompleteActions();
}

private void completeDelayedOperations(TableBucket tableBucket) {
DelayedTableBucketKey delayedTableBucketKey = new DelayedTableBucketKey(tableBucket);
delayedWriteManager.checkAndComplete(delayedTableBucketKey);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.replica.delay;

import org.apache.fluss.annotation.Internal;

/**
* A queue for collecting actions which need to be executed later.
*
* <p>This is used to decouple the enqueuing of delayed operation completions from their execution.
* For example, after appending records, we enqueue actions to complete delayed fetch operations,
* then execute them after the write path is fully finished.
*
* @see DelayedActionQueue
*/
@Internal
public interface ActionQueue {

/** Adds an action to this queue. */
void add(Runnable action);

/** Tries to complete all pending actions in the queue. */
void tryCompleteActions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.replica.delay;

import org.apache.fluss.annotation.Internal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Default implementation of {@link ActionQueue} that collects actions into a concurrent queue and
* executes them when {@link #tryCompleteActions()} is called.
*
* <p>Uses {@link ConcurrentLinkedQueue} for lock-free enqueue. Actions are executed and removed
* from the queue when {@link #tryCompleteActions()} is called.
*/
@Internal
public class DelayedActionQueue implements ActionQueue {
private static final Logger LOG = LoggerFactory.getLogger(DelayedActionQueue.class);

private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

@Override
public void add(Runnable action) {
queue.add(action);
}

@Override
public void tryCompleteActions() {
int maxToComplete = queue.size();
int count = 0;
while (count < maxToComplete) {
Runnable action = queue.poll();
if (action == null) {
break;
}
try {
action.run();
} catch (Throwable t) {
LOG.error("Failed to complete delayed action.", t);
}
count++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public String name() {
@Override
public void shutdown() {}

@Override
public void tryCompleteActions() {
replicaManager.tryCompleteActions();
}

@Override
public CompletableFuture<ProduceLogResponse> produceLog(ProduceLogRequest request) {
authorizeTable(WRITE, request.getTableId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ void testCompleteDelayedFetchLog() throws Exception {
assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(1);
assertThat(delayedFetchLogManager.watched()).isEqualTo(1);

// write data.
// Produce data — appendRecordsToLog enqueues a checkAndComplete action,
// which is executed when tryCompleteActions() is called (normally by framework layer).
assertThat(delayedResponse.isDone()).isFalse();
CompletableFuture<List<ProduceLogResultForBucket>> future = new CompletableFuture<>();
replicaManager.appendRecordsToLog(
Expand All @@ -92,11 +93,12 @@ void testCompleteDelayedFetchLog() throws Exception {
Collections.singletonMap(tb, genMemoryLogRecordsByObject(DATA1)),
null,
future::complete);
// Simulate framework layer trigger (FlussRequestHandler calls this after invoke).
replicaManager.tryCompleteActions();
assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L));

// check and complete manually
numComplete = delayedFetchLogManager.checkAndComplete(delayedTableBucketKey);
assertThat(numComplete).isEqualTo(1);
// The delayed fetch log should already be completed by tryCompleteActions above.
assertThat(delayedResponse.isDone()).isTrue();
assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0);
assertThat(delayedFetchLogManager.watched()).isEqualTo(0);

Expand All @@ -107,6 +109,58 @@ void testCompleteDelayedFetchLog() throws Exception {
assertLogRecordsEquals(DATA1_ROW_TYPE, resultForBucket.records(), DATA1);
}

@Test
void testProduceAutoCompletesDelayedFetchLog() throws Exception {
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
makeLogTableAsLeader(tb.getBucket());

// Set up a delayed fetch with follower-like params (LOG_END isolation, minBytes=1).
FetchLogResultForBucket preFetchResultForBucket =
new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L);
CompletableFuture<Map<TableBucket, FetchLogResultForBucket>> delayedResponse =
new CompletableFuture<>();
DelayedFetchLog delayedFetchLog =
createDelayedFetchLogRequest(
tb,
1, // minFetchBytes = 1, like follower fetch
Duration.ofMinutes(3).toMillis(),
new FetchBucketStatus(
new FetchReqInfo(150001L, 0L, Integer.MAX_VALUE),
new LogOffsetMetadata(0L, 0L, 0),
preFetchResultForBucket),
delayedResponse::complete);

DelayedOperationManager<DelayedFetchLog> delayedFetchLogManager =
replicaManager.getDelayedFetchLogManager();
DelayedTableBucketKey delayedTableBucketKey = new DelayedTableBucketKey(tb);
delayedFetchLogManager.tryCompleteElseWatch(
delayedFetchLog, Collections.singletonList(delayedTableBucketKey));
assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(1);
assertThat(delayedResponse.isDone()).isFalse();

// Produce with acks=1 — response returns immediately, action enqueued.
CompletableFuture<List<ProduceLogResultForBucket>> produceResponse =
new CompletableFuture<>();
replicaManager.appendRecordsToLog(
20000,
1,
Collections.singletonMap(tb, genMemoryLogRecordsByObject(DATA1)),
null,
produceResponse::complete);
// Simulate framework layer trigger (FlussRequestHandler calls this after invoke).
replicaManager.tryCompleteActions();
assertThat(produceResponse.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L));

// The delayed fetch should have been auto-completed by tryCompleteActions.
assertThat(delayedResponse.isDone()).isTrue();
assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0);

Map<TableBucket, FetchLogResultForBucket> result = delayedResponse.get();
FetchLogResultForBucket resultForBucket = result.get(tb);
assertThat(resultForBucket).isNotNull();
assertLogRecordsEquals(DATA1_ROW_TYPE, resultForBucket.records(), DATA1);
}

@Test
void testDelayFetchLogTimeout() {
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
Expand Down