diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 7a0e0f5dd2..445972940f 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -375,7 +375,7 @@ public void beginFlush() { } /** Mark all buckets as ready to send and block until to send is complete. */ - public void awaitFlushCompletion() throws InterruptedException { + public void awaitFlushCompletion() throws Exception { try { // Obtain a copy of all the incomplete write request result(s) at the time of the // flush. We must be careful not to hold a reference to the ProduceBatch(s) so that diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java index a8e62cd370..0a49df8dfb 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java @@ -321,6 +321,7 @@ private boolean canRetry(ReadyWriteBatch readyWriteBatch, Errors error) { WriteBatch batch = readyWriteBatch.writeBatch(); return batch.attempts() < retries && !batch.isDone() + && batch.waitedTimeMs(System.currentTimeMillis()) < maxRequestTimeoutMs && ((error.exception() instanceof RetriableException) || (idempotenceManager.idempotenceEnabled() && idempotenceManager.canRetry( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java index 9702bb5438..c84ff1c70b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java @@ -219,7 +219,7 @@ private void completeFutureAndFireCallbacks( e); } }); - requestFuture.done(); + requestFuture.done(exception); } int attempts() { @@ -306,15 +306,20 @@ private enum FinalState { /** The future for this batch. */ public static class RequestFuture { private final CountDownLatch latch = new CountDownLatch(1); + @Nullable private Exception exception; /** Mark this request as complete and unblock any threads waiting on its completion. */ - public void done() { + public void done(@Nullable Exception exception) { + this.exception = exception; latch.countDown(); } /** Await the completion of this request. */ - public void await() throws InterruptedException { + public void await() throws Exception { latch.await(); + if (exception != null) { + throw exception; + } } } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java index b4c96b0ac4..ea244f2d70 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java @@ -164,6 +164,10 @@ public void flush() { "Flush interrupted after %d ms. Writer may be in inconsistent state", System.currentTimeMillis() - start), e); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format("Flush failed after %d ms.", System.currentTimeMillis() - start), + e); } LOG.trace( "Flushed accumulated records in writer in {} ms.", diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 0e4ab9c97f..7b899d7918 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -65,6 +65,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -463,6 +464,34 @@ void testFlush() throws Exception { assertThat(accum.hasIncomplete()).isFalse(); } + @Test + void testAwaitFlushCompletionPropagatesBatchFailure() throws Exception { + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + RecordAccumulator accum = createTestRecordAccumulator(4 * 1024, 64 * 1024); + accum.append(createRecord(row), writeCallback, cluster, 0, false); + + accum.beginFlush(); + CompletableFuture flushResult = + CompletableFuture.supplyAsync( + () -> { + try { + accum.awaitFlushCompletion(); + return null; + } catch (Throwable t) { + return t; + } + }); + + Map> results = + accum.drain(cluster, accum.ready(cluster).readyNodes, Integer.MAX_VALUE); + WriteBatch batch = results.get(node1.id()).get(0).writeBatch(); + RuntimeException expected = new RuntimeException("write failed"); + batch.completeExceptionally(expected); + + assertThat(flushResult.get(5, TimeUnit.SECONDS)).isSameAs(expected); + accum.deallocate(batch); + } + @Test void testTableWithUnknownLeader() throws Exception { int batchSize = 100;