Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void beginFlush() {
}

/** Mark all buckets as ready to send and block until to send is complete. */
public void awaitFlushCompletion() throws InterruptedException {
public void awaitFlushCompletion() throws Exception {
try {
// Obtain a copy of all the incomplete write request result(s) at the time of the
// flush. We must be careful not to hold a reference to the ProduceBatch(s) so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ private boolean canRetry(ReadyWriteBatch readyWriteBatch, Errors error) {
WriteBatch batch = readyWriteBatch.writeBatch();
return batch.attempts() < retries
&& !batch.isDone()
&& batch.waitedTimeMs(System.currentTimeMillis()) < maxRequestTimeoutMs
&& ((error.exception() instanceof RetriableException)
|| (idempotenceManager.idempotenceEnabled()
&& idempotenceManager.canRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private void completeFutureAndFireCallbacks(
e);
}
});
requestFuture.done();
requestFuture.done(exception);
}

int attempts() {
Expand Down Expand Up @@ -306,15 +306,20 @@ private enum FinalState {
/** The future for this batch. */
public static class RequestFuture {
private final CountDownLatch latch = new CountDownLatch(1);
@Nullable private Exception exception;

/** Mark this request as complete and unblock any threads waiting on its completion. */
public void done() {
public void done(@Nullable Exception exception) {
this.exception = exception;
latch.countDown();
}

/** Await the completion of this request. */
public void await() throws InterruptedException {
public void await() throws Exception {
latch.await();
if (exception != null) {
throw exception;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ public void flush() {
"Flush interrupted after %d ms. Writer may be in inconsistent state",
System.currentTimeMillis() - start),
e);
} catch (Exception e) {
throw new FlussRuntimeException(
String.format("Flush failed after %d ms.", System.currentTimeMillis() - start),
e);
}
LOG.trace(
"Flushed accumulated records in writer in {} ms.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -463,6 +464,34 @@ void testFlush() throws Exception {
assertThat(accum.hasIncomplete()).isFalse();
}

@Test
void testAwaitFlushCompletionPropagatesBatchFailure() throws Exception {
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
RecordAccumulator accum = createTestRecordAccumulator(4 * 1024, 64 * 1024);
accum.append(createRecord(row), writeCallback, cluster, 0, false);

accum.beginFlush();
CompletableFuture<Throwable> flushResult =
CompletableFuture.supplyAsync(
() -> {
try {
accum.awaitFlushCompletion();
return null;
} catch (Throwable t) {
return t;
}
});

Map<Integer, List<ReadyWriteBatch>> results =
accum.drain(cluster, accum.ready(cluster).readyNodes, Integer.MAX_VALUE);
WriteBatch batch = results.get(node1.id()).get(0).writeBatch();
RuntimeException expected = new RuntimeException("write failed");
batch.completeExceptionally(expected);

assertThat(flushResult.get(5, TimeUnit.SECONDS)).isSameAs(expected);
accum.deallocate(batch);
}

@Test
void testTableWithUnknownLeader() throws Exception {
int batchSize = 100;
Expand Down