From 58af6db3b47a82754cd4e900a64e59b6e670557a Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 9 Jun 2026 14:45:43 +0200 Subject: [PATCH 01/10] fix(qwp): fix missed terminal send errors during Sender close Fixes a race where QwpWebSocketSender.close() could return successfully even after a terminal QWP/WebSocket error was latched during close. **implementation details for reviewers** Track the exact `lastError` instance that `checkError()` has synchronously surfaced and make `QwpWebSocketSender.close()` suppress only that pre-owned instance. This avoids treating a freshly latched terminal error as already owned when it appears between separate `hasUnsurfacedError()` and `getLastError()` reads. --- .../qwp/client/QwpWebSocketSender.java | 12 +++---- .../sf/cursor/CursorWebSocketSendLoop.java | 36 +++++++++++-------- ...CursorWebSocketSendLoopErrorLatchTest.java | 9 +++++ 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 7d43d72f..ce78431e 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -962,14 +962,14 @@ public void close() { // SCHEMA_MISMATCH HALT) from users who only call close() and // never call flush() afterwards. Throwable terminalError = null; - // Snapshot the latched terminal error that the user thread has - // ALREADY caught (via flush()/at()) before close() ran. If - // flushPendingRows/drainOnClose below also rethrow the same + // Snapshot the exact terminal error instance that a user-thread + // API call ALREADY caught (via flush()/at()) before close() ran. + // If flushPendingRows/drainOnClose below also rethrow the same // instance, dropping it at the final rethrow avoids // try-with-resources self-suppression: Throwable.addSuppressed // raises IllegalArgumentException when primary == suppressed. - Throwable alreadyOwnedByUser = (cursorSendLoop != null && !cursorSendLoop.hasUnsurfacedError()) - ? cursorSendLoop.getLastError() : null; + Throwable alreadyOwnedByUser = cursorSendLoop != null + ? cursorSendLoop.getSynchronouslySurfacedError() : null; try { // Only drain when both the engine and the I/O loop are wired @@ -991,7 +991,7 @@ public void close() { // only when no other channel has already delivered it // to the user. "Already delivered" means either the // producer thread saw it synchronously via - // flush()/append() (errorSurfacedSynchronously) or the + // flush()/append() (synchronouslySurfacedError) or the // async dispatcher delivered it to a user-installed // custom handler at any point in this sender's life // (deliveredToCustomHandler). The latter survives a diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 9f7182da..2e9ce6de 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -170,14 +170,10 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // directly to the same dispatcher from QwpWebSocketSender. private volatile SenderConnectionDispatcher connectionDispatcher; private volatile SenderErrorDispatcher errorDispatcher; - // Set by checkError() the first time it actually rethrows lastError to a - // synchronous user-thread caller (flush/append/close). close() consults - // this to decide whether to rethrow the latched terminal -- if a producer - // thread already saw the error from a flush() call, throwing again from - // close() would mask any in-flight test assertion or user exception. The - // async dispatcher path does NOT set this flag: a user who only watches - // the async error inbox still gets a loud failure on shutdown. - private volatile boolean errorSurfacedSynchronously; + // Exact lastError instance that checkError() has thrown to a synchronous + // user-thread caller (flush/append/close). close() uses the instance so it + // only suppresses errors the user already owned before close() began. + private volatile Throwable synchronouslySurfacedError; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -472,7 +468,7 @@ public static boolean isTerminalCloseCode(int code) { public void checkError() { Throwable e = lastError; if (e != null) { - errorSurfacedSynchronously = true; + synchronouslySurfacedError = e; if (e instanceof LineSenderException) throw (LineSenderException) e; throw new LineSenderException("I/O thread failed: " + e.getMessage(), e); } @@ -528,6 +524,15 @@ public Throwable getLastError() { return lastError; } + /** + * Returns the exact latched throwable instance already thrown by + * {@link #checkError()}, or {@code null} when no synchronous caller has + * owned the terminal error yet. + */ + public Throwable getSynchronouslySurfacedError() { + return synchronouslySurfacedError; + } + /** * Snapshot of the typed server-rejection payload for the latched terminal error, * or {@code null} if the loop has not latched a server-rejection terminal (or has @@ -607,14 +612,15 @@ public boolean hasEverConnected() { /** * True when {@link #lastError} is set AND no synchronous user-thread - * caller has yet seen it via {@link #checkError()}. close() uses this - * to decide whether to rethrow as a safety net: a user who only ever - * called close() (e.g. async-initial-connect that never reached the - * server) needs to see the error from somewhere; a user who already - * caught it from flush() does not. + * caller has yet seen that same instance via {@link #checkError()}. + * close() uses this to decide whether to rethrow as a safety net: a user + * who only ever called close() (e.g. async-initial-connect that never + * reached the server) needs to see the error from somewhere; a user who + * already caught it from flush() does not. */ public boolean hasUnsurfacedError() { - return lastError != null && !errorSurfacedSynchronously; + Throwable e = lastError; + return e != null && synchronouslySurfacedError != e; } public boolean isRunning() { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index 8b86c006..56ae0577 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -54,6 +54,7 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { SenderError err = newSenderError(); LineSenderServerException original = new LineSenderServerException(err); setField(loop, "lastError", original); + Assert.assertNull(loop.getSynchronouslySurfacedError()); try { loop.checkError(); @@ -64,6 +65,10 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { Assert.assertSame(err, ((LineSenderServerException) thrown).getServerError()); } + Assert.assertSame("checkError must mark the exact latched throwable as user-owned", + original, loop.getSynchronouslySurfacedError()); + Assert.assertFalse("a synchronously surfaced latch is no longer unsurfaced", + loop.hasUnsurfacedError()); } @Test @@ -74,6 +79,7 @@ public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Throwable raw = new RuntimeException("oh no"); setField(loop, "lastError", raw); + Assert.assertNull(loop.getSynchronouslySurfacedError()); try { loop.checkError(); @@ -83,6 +89,9 @@ public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { Assert.assertEquals(raw, thrown.getCause()); Assert.assertTrue(thrown.getMessage().contains("oh no")); } + Assert.assertSame("wrapped throwables are owned by their latched source instance", + raw, loop.getSynchronouslySurfacedError()); + Assert.assertFalse(loop.hasUnsurfacedError()); } @Test From e6088bd7d5121e8d9fc0330597eb9bfd0984ac00 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 15:50:17 +0200 Subject: [PATCH 02/10] probabilistic test --- .../sf/cursor/CloseOwnershipRaceTest.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java new file mode 100644 index 00000000..eb4d161c --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -0,0 +1,86 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client.sf.cursor; + +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine; +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorWebSocketSendLoop; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Regression guard for the close()-ownership race in {@code QwpWebSocketSender.close()}: + * its {@code alreadyOwnedByUser} snapshot was once computed from two separate volatile + * reads ({@code !hasUnsurfacedError() ? getLastError() : null}), so a terminal error + * latched by the I/O thread between the reads was mis-captured as already-owned and + * silently dropped on close(). The fix is the single read + * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}; this test races that + * read against real latch transitions and fails on any torn snapshot. + */ +public class CloseOwnershipRaceTest { + + private static final int ROUNDS = 1000; + + @Rule + public final TemporaryFolder sfDir = TemporaryFolder.builder().assureDeletion().build(); + + @Test(timeout = 60_000) + public void closeOwnershipSnapshotNeverClaimsAnUnsurfacedError() { + try (CursorSendEngine engine = new CursorSendEngine( + sfDir.getRoot().getAbsolutePath(), 16_384)) { + Throwable leaked = null; + for (int i = 0; i < ROUNDS && leaked == null; i++) { + // A null client, a reconnect factory that never produces one, + // and a zero reconnect budget: start()'s real I/O thread walks + // the production async-initial-connect path and latches a + // genuine RECONNECT_BUDGET_EXHAUSTED terminal within + // microseconds. One authentic null->error latch transition + // per round. + CursorWebSocketSendLoop loop = new CursorWebSocketSendLoop( + null, engine, 0, 1_000_000L, + () -> null, + 0, // reconnect budget: exhausted on arrival + 1, 1); + loop.start(); + // Race close()'s exact ownership snapshot against the latch + // transition, stopping once the latch has landed. Nothing in + // this test calls checkError(), so no synchronous caller ever + // owns the error and the snapshot must stay null. On fixed + // code it reads a field nobody here writes — it cannot flake; + // a reintroduced two-read snapshot tears when the latch lands + // between its reads, with near-certainty across all rounds. + while (leaked == null && loop.getLastError() == null) { + leaked = loop.getSynchronouslySurfacedError(); + } + loop.close(); + } + Assert.assertNull( + "close() captured a terminal error as 'already owned by user' that no " + + "synchronous caller ever saw -- close() would silently drop it: " + leaked, + leaked); + } + } +} From 2b0e443765f776d1316aa24e844a37eb7c65ec8c Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 17:47:09 +0200 Subject: [PATCH 03/10] Simplify the cursor loop's terminal-error latch The latch leaked its invariants into callers: close() had to compose hasUnsurfacedError() + checkError() for the safety-net rethrow, and checkError() minted a fresh wrapper per call for raw causes, so drainOnClose() could rethrow an error the user had already caught. - recordFatal() wraps non-LineSenderException causes once, at latch time: every rethrow delivers the same instance, making close()'s identity suppression correct in every state. - checkUnsurfacedError() encapsulates the close() safety net; hasUnsurfacedError() becomes private, so the torn two-read ownership snapshot is no longer writable outside the class. - getLastTerminalServerError() derives the SenderError from the latched LineSenderServerException; the sibling field is gone. --- .../qwp/client/QwpWebSocketSender.java | 31 +++-- .../sf/cursor/CursorWebSocketSendLoop.java | 128 ++++++++++-------- .../sf/cursor/CloseOwnershipRaceTest.java | 10 +- ...CursorWebSocketSendLoopErrorLatchTest.java | 86 +++++++++--- 4 files changed, 163 insertions(+), 92 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 80e4c9dd..beec7512 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -934,6 +934,12 @@ public void close() { // instance, dropping it at the final rethrow avoids // try-with-resources self-suppression: Throwable.addSuppressed // raises IllegalArgumentException when primary == suppressed. + // Must stay this single read: the snapshot needs the identity of + // the error the user already owns, and only + // getSynchronouslySurfacedError() holds it. Deriving it from two + // separate latch reads races the I/O thread -- a terminal latched + // between the reads would be adopted as user-owned and silently + // dropped (see CloseOwnershipRaceTest). Throwable alreadyOwnedByUser = cursorSendLoop != null ? cursorSendLoop.getSynchronouslySurfacedError() : null; @@ -957,21 +963,20 @@ public void close() { // only when no other channel has already delivered it // to the user. "Already delivered" means either the // producer thread saw it synchronously via - // flush()/append() (synchronouslySurfacedError) or the - // async dispatcher delivered it to a user-installed - // custom handler at any point in this sender's life - // (deliveredToCustomHandler). The latter survives a - // setErrorHandler(null) cleanup in test helpers -- - // once the user has owned an error, close() should - // not double-signal it. The default no-op logging - // handler does not count as "delivered to user", so a - // config-string-only caller still gets the loud - // rethrow on shutdown. + // flush()/append() (checkUnsurfacedError is silent in + // that case) or the async dispatcher delivered it to a + // user-installed custom handler at any point in this + // sender's life (deliveredToCustomHandler, checked + // here). The latter survives a setErrorHandler(null) + // cleanup in test helpers -- once the user has owned + // an error, close() should not double-signal it. The + // default no-op logging handler does not count as + // "delivered to user", so a config-string-only caller + // still gets the loud rethrow on shutdown. boolean alreadyDeliveredToCustomHandler = errorDispatcher != null && errorDispatcher.hasDeliveredToCustomHandler(); - if (!alreadyDeliveredToCustomHandler - && cursorSendLoop.hasUnsurfacedError()) { - cursorSendLoop.checkError(); + if (!alreadyDeliveredToCustomHandler) { + cursorSendLoop.checkUnsurfacedError(); } // 3) Bounded drain: block until the server has ACK'd // everything we just published, or until the diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 5e701308..89179e96 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -173,7 +173,7 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // Exact lastError instance that checkError() has thrown to a synchronous // user-thread caller (flush/append/close). close() uses the instance so it // only suppresses errors the user already owned before close() began. - private volatile Throwable synchronouslySurfacedError; + private volatile LineSenderException synchronouslySurfacedError; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -195,18 +195,17 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // up" (looks transient). private volatile boolean hasEverConnected; private volatile Thread ioThread; - private volatile Throwable lastError; + // The latched terminal failure — THE exception every checkError() call + // rethrows. Non-LineSenderException causes are wrapped once at latch time + // (recordFatal), so rethrows always deliver the same instance and close() + // can suppress double-signals by identity. + private volatile LineSenderException lastError; // Wall clock of the last outbound activity on the wire -- a sent frame // (trySendOne) or a keepalive PING (sendDurableAckKeepaliveIfDue). // Throttles the durable-ack keepalive PING so it fires only when the // configured interval has elapsed since the most recent outbound event. // Zero until the first send; reset to zero on reconnect. private long lastFrameOrPingNanos; - // Typed payload sibling to lastError. Set when recordFatal is called with - // a SenderError (HALT-policy server rejection or terminal protocol violation); - // remains null for wire-level fatals (reconnect-budget exhaustion, etc). - // Read by QwpWebSocketSender.getLastTerminalError() for ops visibility. - private volatile SenderError lastTerminalServerError; private long nextWireSeq; private volatile SenderProgressDispatcher progressDispatcher; // Frames sent during the post-reconnect catch-up window — i.e. frames @@ -462,15 +461,30 @@ public static boolean isTerminalCloseCode(int code) { /** * Surfaces any error the I/O thread recorded. Called by the producer * thread (typically from inside its append wrapper) so failures don't - * stay silent. Idempotent; once an error is set the loop has already - * exited. + * stay silent. Every call rethrows the exact latched instance — close() + * relies on that identity to suppress double-signals. Idempotent; once + * an error is set the loop has already exited. */ public void checkError() { - Throwable e = lastError; + LineSenderException e = lastError; if (e != null) { synchronouslySurfacedError = e; - if (e instanceof LineSenderException) throw (LineSenderException) e; - throw new LineSenderException("I/O thread failed: " + e.getMessage(), e); + throw e; + } + } + + /** + * Safety-net variant of {@link #checkError()} for + * {@code QwpWebSocketSender.close()}: rethrows the latched terminal error + * only when no synchronous caller has owned it yet. A user who already + * caught the error from flush()/append() stays undisturbed — throwing + * again from close() would double-signal an error they already handled. + * A user who only ever called close() (e.g. async-initial-connect that + * never reached the server) still gets the loud failure. + */ + public void checkUnsurfacedError() { + if (hasUnsurfacedError()) { + checkError(); } } @@ -524,24 +538,35 @@ public Throwable getLastError() { return lastError; } + /** + * Typed server-rejection payload of the latched terminal error, or + * {@code null} when the loop latched a wire-level failure (or nothing). + * Derived from the latch — a server-rejection terminal is always latched + * as a {@link LineSenderServerException} carrying its {@link SenderError}. + */ + public SenderError getLastTerminalServerError() { + LineSenderException e = lastError; + return e instanceof LineSenderServerException + ? ((LineSenderServerException) e).getServerError() : null; + } + /** * Returns the exact latched throwable instance already thrown by * {@link #checkError()}, or {@code null} when no synchronous caller has * owned the terminal error yet. + *

+ * This is the single read {@code QwpWebSocketSender.close()} uses to learn + * which terminal error the user already owns. The ownership decision must + * be taken from this one field only: deriving it from two separate latch + * reads (e.g. an unsurfaced-check followed by {@link #getLastError()}) + * races the I/O thread — a terminal latched between the reads gets + * mis-captured as already-owned and is then silently dropped on close(). + * Guarded by {@code CloseOwnershipRaceTest}. */ public Throwable getSynchronouslySurfacedError() { return synchronouslySurfacedError; } - /** - * Snapshot of the typed server-rejection payload for the latched terminal error, - * or {@code null} if the loop has not latched a server-rejection terminal (or has - * latched only a wire-level failure with no SenderError associated). - */ - public SenderError getLastTerminalServerError() { - return lastTerminalServerError; - } - public long getTotalAcks() { return totalAcks.get(); } @@ -610,19 +635,6 @@ public boolean hasEverConnected() { return hasEverConnected; } - /** - * True when {@link #lastError} is set AND no synchronous user-thread - * caller has yet seen that same instance via {@link #checkError()}. - * close() uses this to decide whether to rethrow as a safety net: a user - * who only ever called close() (e.g. async-initial-connect that never - * reached the server) needs to see the error from somewhere; a user who - * already caught it from flush() does not. - */ - public boolean hasUnsurfacedError() { - Throwable e = lastError; - return e != null && synchronouslySurfacedError != e; - } - public boolean isRunning() { return running; } @@ -836,7 +848,7 @@ private void connectLoop(Throwable initial, String phase) { System.nanoTime() ); totalServerErrors.incrementAndGet(); - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } catch (QwpDurableAckMismatchException e) { @@ -861,7 +873,7 @@ private void connectLoop(Throwable initial, String phase) { System.nanoTime() ); totalServerErrors.incrementAndGet(); - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } catch (QwpRoleMismatchException | QwpIngressRoleRejectedException e) { @@ -941,7 +953,7 @@ private void connectLoop(Throwable initial, String phase) { totalServerErrors.incrementAndGet(); // recordFatal MUST run before dispatchError so the producer-observable // terminal error is latched before the handler is invoked. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); // Surface the terminal classification through the connection-event // dispatcher too. Listeners learn about budget exhaustion without @@ -1047,6 +1059,18 @@ private void fail(Throwable initial) { connectLoop(initial, "reconnect"); } + /** + * True when {@link #lastError} is set AND no synchronous user-thread + * caller has yet seen that same instance via {@link #checkError()}. + * The {@link #checkUnsurfacedError()} safety net composes this with + * checkError(); reads {@code lastError} once so the comparison cannot + * tear against a concurrent latch. + */ + private boolean hasUnsurfacedError() { + Throwable e = lastError; + return e != null && synchronouslySurfacedError != e; + } + private void ioLoop() { try { // Async-initial-connect path: ctor accepted a null client because @@ -1148,27 +1172,21 @@ private void positionCursorInSegment(MmapSegment seg, long targetFsn) { /** * Mark the loop as fatally failed. Caller has decided no reconnect - * is possible (or it ran out of budget) — record the error so + * is possible (or it ran out of budget) — latch the error so * {@link #checkError} can surface it to the producer thread, then - * stop the loop. + * stop the loop. Idempotent — only the first failure latches. + * Non-{@link LineSenderException} causes are wrapped once here, so + * every rethrow delivers the same instance. */ private void recordFatal(Throwable t) { - recordFatal(t, null); - } - - /** - * Server-rejection-aware variant. Stashes a typed {@link SenderError} alongside - * the throwable so {@code QwpWebSocketSender.getLastTerminalError()} can surface - * the structured payload for ops/observability. Idempotent — only the first - * failure latches. - */ - private void recordFatal(Throwable t, SenderError serverError) { if (lastError == null) { - lastError = t; - lastTerminalServerError = serverError; + lastError = t instanceof LineSenderException + ? (LineSenderException) t + : new LineSenderException("I/O thread failed: " + t.getMessage(), t); } running = false; - if (serverError != null) { + if (t instanceof LineSenderServerException) { + // server rejections carry a structured message; the stack adds noise LOG.error("Cursor I/O loop failure: {}", t.getMessage()); } else { LOG.error("Cursor I/O loop failure: {}", t.getMessage(), t); @@ -1480,7 +1498,7 @@ public void onClose(int code, String reason) { // recordFatal MUST run before dispatchError so the producer- // observable terminal error is latched before the handler is // invoked. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); return; } @@ -1519,7 +1537,7 @@ private void handlePreSendRejection(long wireSeq, byte status, // so a synchronous probe of getLastTerminalError() / flush() // from inside the handler observes the typed error. Mirrors // the ordering in the post-send HALT path below. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); } // DROP_AND_CONTINUE: no watermark advance -- there is nothing // sent on this connection to drop. The dispatch is the user's @@ -1584,7 +1602,7 @@ private void handleServerRejection(long wireSeq) { // probes getLastTerminalError() (or calls flush()) sees the // typed error rather than null. Bytes on disk are the bytes // the server rejected; reconnect/replay cannot fix them. - recordFatal(new LineSenderServerException(err), err); + recordFatal(new LineSenderServerException(err)); dispatchError(err); } else { // DROP_AND_CONTINUE: advance ackedFsn past the rejected span diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java index eb4d161c..5f8a0e8d 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -37,8 +37,14 @@ * reads ({@code !hasUnsurfacedError() ? getLastError() : null}), so a terminal error * latched by the I/O thread between the reads was mis-captured as already-owned and * silently dropped on close(). The fix is the single read - * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}; this test races that - * read against real latch transitions and fails on any torn snapshot. + * {@link CursorWebSocketSendLoop#getSynchronouslySurfacedError()}. + *

+ * Scope: this test races that accessor and fails if its implementation ever recomputes + * the snapshot from two reads. It does not execute {@code QwpWebSocketSender.close()} + * itself — old and new snapshots agree in every non-racy state, so no black-box test + * at the sender level can detect a callsite that bypasses the accessor; that half of + * the contract is pinned by the "must stay this single read" comments at the callsite + * and on the accessor. */ public class CloseOwnershipRaceTest { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index 56ae0577..d238da42 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -67,31 +67,45 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { } Assert.assertSame("checkError must mark the exact latched throwable as user-owned", original, loop.getSynchronouslySurfacedError()); - Assert.assertFalse("a synchronously surfaced latch is no longer unsurfaced", - loop.hasUnsurfacedError()); + // a synchronously surfaced latch is owned -- the close() safety net + // must stay silent now + loop.checkUnsurfacedError(); } @Test - public void testCheckErrorWrapsNonLineSenderThrowable() throws Exception { - // For non-LineSenderException throwables (NPE, IOException, etc.), - // checkError wraps in a fresh LineSenderException with the original - // as cause so producers always see one exception type. + public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { + // Non-LineSenderException causes (NPE, IOException, etc.) are wrapped + // in a LineSenderException ONCE, at latch time -- so producers always + // see one exception type AND every rethrow delivers the same instance, + // which close() relies on to suppress double-signals by identity. CursorWebSocketSendLoop loop = newBareLoop(); Throwable raw = new RuntimeException("oh no"); - setField(loop, "lastError", raw); + invokeRecordFatal(loop, raw); Assert.assertNull(loop.getSynchronouslySurfacedError()); + LineSenderException first = null; try { loop.checkError(); Assert.fail("expected throw"); } catch (LineSenderException thrown) { Assert.assertNotSame(raw, thrown); - Assert.assertEquals(raw, thrown.getCause()); + Assert.assertSame(raw, thrown.getCause()); Assert.assertTrue(thrown.getMessage().contains("oh no")); + first = thrown; + } + Assert.assertSame("the latch must hold the wrapper, not the raw cause", + first, loop.getLastError()); + Assert.assertSame("ownership tracks the latched wrapper", + first, loop.getSynchronouslySurfacedError()); + loop.checkUnsurfacedError(); // owned -> silent + + try { + loop.checkError(); + Assert.fail("expected throw"); + } catch (LineSenderException thrownAgain) { + Assert.assertSame("every rethrow must deliver the same instance", + first, thrownAgain); } - Assert.assertSame("wrapped throwables are owned by their latched source instance", - raw, loop.getSynchronouslySurfacedError()); - Assert.assertFalse(loop.hasUnsurfacedError()); } @Test @@ -101,6 +115,33 @@ public void testCheckErrorIsNoopWhenNoLatch() throws Exception { loop.checkError(); // must not throw } + @Test + public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception { + // The close() safety net: an unowned latch must rethrow exactly like + // checkError; once any synchronous caller has owned the error, it + // must stay silent -- unlike checkError, which keeps throwing. + CursorWebSocketSendLoop loop = newBareLoop(); + loop.checkUnsurfacedError(); // no latch -> silent + + LineSenderException e = new LineSenderException("wire fail"); + setField(loop, "lastError", e); + try { + loop.checkUnsurfacedError(); + Assert.fail("an unowned latch must rethrow from the safety net"); + } catch (LineSenderException thrown) { + Assert.assertSame(e, thrown); + } + + loop.checkUnsurfacedError(); // the throw above made the caller owner -> silent + + try { + loop.checkError(); + Assert.fail("checkError must keep rethrowing a latched error"); + } catch (LineSenderException thrown) { + Assert.assertSame(e, thrown); + } + } + @Test public void testGetLastErrorReturnsLatchedThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); @@ -123,10 +164,10 @@ public void testRecordFatalLatchesThrowableOnly() throws Exception { setField(loop, "running", true); Throwable e = new LineSenderException("wire fail"); - invokeRecordFatal(loop, e, null); + invokeRecordFatal(loop, e); Assert.assertSame(e, loop.getLastError()); - Assert.assertNull("typed payload must be null when recordFatal called without one", + Assert.assertNull("typed payload must be null for a wire-level fatal", loop.getLastTerminalServerError()); Assert.assertFalse("recordFatal must stop the loop", (Boolean) getField(loop, "running")); @@ -139,10 +180,11 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception SenderError err = newSenderError(); LineSenderServerException ex = new LineSenderServerException(err); - invokeRecordFatal(loop, ex, err); + invokeRecordFatal(loop, ex); Assert.assertSame(ex, loop.getLastError()); - Assert.assertSame(err, loop.getLastTerminalServerError()); + Assert.assertSame("typed payload is derived from the latched LineSenderServerException", + err, loop.getLastTerminalServerError()); Assert.assertFalse((Boolean) getField(loop, "running")); } @@ -150,13 +192,13 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception public void testRecordFatalIsIdempotent() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); setField(loop, "running", true); - Throwable first = new LineSenderException("first"); - Throwable second = new LineSenderException("second"); SenderError firstErr = newSenderError(); SenderError secondErr = newSenderError(); + LineSenderServerException first = new LineSenderServerException(firstErr); + LineSenderServerException second = new LineSenderServerException(secondErr); - invokeRecordFatal(loop, first, firstErr); - invokeRecordFatal(loop, second, secondErr); + invokeRecordFatal(loop, first); + invokeRecordFatal(loop, second); // Only the first failure latches — subsequent calls must not // overwrite, otherwise a follow-on cascade would mask the original @@ -199,11 +241,11 @@ private static Object getField(Object target, String name) throws Exception { return f.get(target); } - private static void invokeRecordFatal(CursorWebSocketSendLoop loop, Throwable t, SenderError err) + private static void invokeRecordFatal(CursorWebSocketSendLoop loop, Throwable t) throws Exception { Method m = CursorWebSocketSendLoop.class.getDeclaredMethod( - "recordFatal", Throwable.class, SenderError.class); + "recordFatal", Throwable.class); m.setAccessible(true); - m.invoke(loop, t, err); + m.invoke(loop, t); } } From 26a9685841a31a2bd7a2f6c375927a1afab79ebb Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 18:22:17 +0200 Subject: [PATCH 04/10] Pin both branches of close()'s safety-net rethrow No sender-level test asserted WHETHER close() throws: the existing close assertions (InitialConnectAsyncTest) install a handler and tolerate both outcomes, so deleting the safety net, inverting its handler gate, or always-suppressing the snapshot all survived the suite. CloseSafetyNetTest adds the strict pair, each awaiting the latched terminal deterministically before close(): - a config-string-only caller who never flushed must get the typed terminal thrown from close(); - a caller whose custom handler already received the error must get a silent close(). All three mutations above now fail the suite. The snapshot race itself stays covered by CloseOwnershipRaceTest at the accessor. Co-Authored-By: Claude Fable 5 --- .../qwp/client/CloseSafetyNetTest.java | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java new file mode 100644 index 00000000..a874063a --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseSafetyNetTest.java @@ -0,0 +1,151 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.LineSenderServerException; +import io.questdb.client.Sender; +import io.questdb.client.SenderError; +import io.questdb.client.SenderErrorHandler; +import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Pins both branches of {@code QwpWebSocketSender.close()}'s safety-net + * rethrow, strictly — unlike the close assertions in + * {@link InitialConnectAsyncTest}, which tolerate either outcome: + *

    + *
  • a config-string-only caller (no custom error handler) who never calls + * flush() MUST see the latched terminal thrown from close() — close() is the + * only channel left;
  • + *
  • a caller whose custom error handler already received the terminal MUST + * NOT have it thrown again from close() — that would double-signal an error + * the user already handled and mask try-with-resources cleanup.
  • + *
+ * Both tests latch the terminal deterministically (await it) before calling + * close(), so they pin the safety net's logic, not the snapshot race — + * {@code CloseOwnershipRaceTest} covers that separately. + */ +public class CloseSafetyNetTest { + + @Rule + public final TemporaryFolder sfDir = TemporaryFolder.builder().assureDeletion().build(); + + @Test(timeout = 30_000) + public void testCloseRethrowsUnsurfacedTerminalWithoutCustomHandler() throws Exception { + // No server, no handler, tight reconnect budget: the I/O thread + // latches a never-connected budget-exhaustion terminal that nothing + // has surfaced to the user. close() must throw it. + Sender sender = Sender.fromConfig(cfg()); + boolean closed = false; + try { + awaitLatchedTerminal((QwpWebSocketSender) sender); + try { + closed = true; + sender.close(); + Assert.fail("close() must rethrow a terminal error that no synchronous " + + "caller and no custom handler has seen"); + } catch (LineSenderException e) { + String msg = e.getMessage() == null ? "" : e.getMessage(); + Assert.assertTrue("close() must rethrow the latched terminal: " + msg, + msg.contains("never-connected-budget-exhausted")); + Assert.assertTrue("the latched instance is the typed server exception", + e instanceof LineSenderServerException); + } + } finally { + if (!closed) { + sender.close(); + } + } + } + + @Test(timeout = 30_000) + public void testCloseStaysSilentWhenCustomHandlerAlreadyDelivered() throws Exception { + // Same terminal, but the user installed a custom error handler and + // the dispatcher delivered the error to it. close() must NOT + // double-signal. + ErrorInbox inbox = new ErrorInbox(); + Sender sender = Sender.builder(cfg()) + .errorHandler(inbox) + .build(); + Assert.assertTrue("terminal must reach the custom handler within 10s", + inbox.await(10, TimeUnit.SECONDS)); + Assert.assertNotNull(inbox.get()); + // The handler owns the error now; a rethrow here would double-signal. + sender.close(); + } + + /** + * Awaits the I/O thread's terminal latch via the read-only ops probe. + * getLastTerminalError() does not mark the error as surfaced, so the + * "no synchronous caller has seen it" precondition stays intact. + */ + private static void awaitLatchedTerminal(QwpWebSocketSender sender) { + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (sender.getLastTerminalError() == null) { + if (System.nanoTime() > deadlineNanos) { + throw new AssertionError("I/O thread did not latch a terminal within 10s"); + } + Thread.onSpinWait(); + } + } + + private String cfg() { + return "ws::addr=localhost:" + TestPorts.findUnusedPort() + + ";sf_dir=" + sfDir.getRoot().getAbsolutePath() + + ";initial_connect_retry=async" + + ";reconnect_max_duration_millis=400" + + ";reconnect_initial_backoff_millis=10" + + ";reconnect_max_backoff_millis=50" + + ";close_flush_timeout_millis=0;"; + } + + private static class ErrorInbox implements SenderErrorHandler { + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference ref = new AtomicReference<>(); + + boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + SenderError get() { + return ref.get(); + } + + @Override + public void onError(SenderError err) { + if (ref.compareAndSet(null, err)) { + latch.countDown(); + } + } + } +} From 7a719efd8ddc1375b86fc63e29b91739684a6b20 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 18:23:10 +0200 Subject: [PATCH 05/10] style --- .../sf/cursor/CursorWebSocketSendLoop.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 89179e96..1f33ed4c 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -88,14 +88,22 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { */ public static final long DEFAULT_DURABLE_ACK_KEEPALIVE_INTERVAL_MILLIS = 200L; public static final long DEFAULT_PARK_NANOS = 50_000L; // 50us idle backoff - /** Default initial reconnect backoff (100 ms). */ + /** + * Default initial reconnect backoff (100 ms). + */ public static final long DEFAULT_RECONNECT_INITIAL_BACKOFF_MILLIS = 100L; - /** Default reconnect max backoff (5 s). */ + /** + * Default reconnect max backoff (5 s). + */ public static final long DEFAULT_RECONNECT_MAX_BACKOFF_MILLIS = 5_000L; - /** Default per-outage reconnect time cap (5 min). */ + /** + * Default per-outage reconnect time cap (5 min). + */ public static final long DEFAULT_RECONNECT_MAX_DURATION_MILLIS = 300_000L; private static final Logger LOG = LoggerFactory.getLogger(CursorWebSocketSendLoop.class); - /** Throttle "reconnect attempt N failed" WARN logs to one per 5 s. */ + /** + * Throttle "reconnect attempt N failed" WARN logs to one per 5 s. + */ private static final long RECONNECT_LOG_THROTTLE_NANOS = 5_000_000_000L; // Pre-converted to nanos for the comparison in sendDurableAckKeepaliveIfDue. // Zero or negative disables the keepalive entirely. @@ -170,10 +178,6 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // directly to the same dispatcher from QwpWebSocketSender. private volatile SenderConnectionDispatcher connectionDispatcher; private volatile SenderErrorDispatcher errorDispatcher; - // Exact lastError instance that checkError() has thrown to a synchronous - // user-thread caller (flush/append/close). close() uses the instance so it - // only suppresses errors the user already owned before close() began. - private volatile LineSenderException synchronouslySurfacedError; // The send cursor has two coordinate systems: // // FSN: durable frame sequence number in the local cursor engine. This is @@ -224,6 +228,10 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // at engine.activeSegment(); advances to newer sealed segments / the new // active as the producer rotates. private MmapSegment sendingSegment; + // Exact lastError instance that checkError() has thrown to a synchronous + // user-thread caller (flush/append/close). close() uses the instance so it + // only suppresses errors the user already owned before close() began. + private volatile LineSenderException synchronouslySurfacedError; /** * Full constructor with explicit reconnect-policy knobs. When @@ -315,7 +323,9 @@ public CursorWebSocketSendLoop(WebSocketClient client, CursorSendEngine engine, this.hasEverConnected = client != null; } - /** Maps a server status byte to a {@link SenderError.Category}. Exposed for unit tests. */ + /** + * Maps a server status byte to a {@link SenderError.Category}. Exposed for unit tests. + */ @TestOnly public static SenderError.Category classify(byte status) { switch (status) { @@ -373,7 +383,7 @@ public static WebSocketClient connectWithRetry( return c; } } catch (QwpAuthFailedException | QwpDurableAckMismatchException - | WebSocketUpgradeException e) { + | WebSocketUpgradeException e) { // Terminal across all configured endpoints per sf-client.md sections // 8.1 (durable-ack mismatch) and 13.3 (auth). Version mismatch is // NOT terminal here -- it falls through to the Throwable branch and @@ -605,7 +615,9 @@ public long getTotalFramesSent() { return totalFramesSent.get(); } - /** Total reconnect attempts (succeeded + failed). */ + /** + * Total reconnect attempts (succeeded + failed). + */ public long getTotalReconnectAttempts() { return totalReconnectAttempts.get(); } @@ -771,7 +783,7 @@ private void applyDurableAck() { */ private void attemptInitialConnect() { connectLoop(new LineSenderException( - "async initial connect deferred to I/O thread"), + "async initial connect deferred to I/O thread"), "initial connect"); } @@ -1406,7 +1418,9 @@ boolean isDurableUnder(CharSequenceLongHashMap watermarks) { } } - /** Inner ACK handler — parses the binary frame, calls engine.acknowledge. */ + /** + * Inner ACK handler — parses the binary frame, calls engine.acknowledge. + */ private final class ResponseHandler implements WebSocketFrameHandler { @Override public void onBinaryMessage(long payloadPtr, int payloadLen) { From 5b7afcd4df252e51e7db2a40862f52ecb06ba5e3 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 11 Jun 2026 21:56:36 +0200 Subject: [PATCH 06/10] Rename the error latch field to terminalError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit lastError suggested most-recent-wins, but the field is a write-once first-writer-wins latch: transient failures reconnect upstream and never reach it; only the error that ends the loop is latched. The name now matches the documented invariant and the "latched terminal" language used throughout. recordFatal's javadoc also states the single-writer rule explicitly: the unsynchronized check-then-latch is only safe because every caller runs on the I/O thread. The retry-loop locals keep the lastError name — there it is accurate. Co-Authored-By: Claude Fable 5 --- .../qwp/client/QwpWebSocketSender.java | 2 +- .../sf/cursor/CursorWebSocketSendLoop.java | 50 ++++++++++++------- .../IoThreadErrorSurfacedOnRowApiTest.java | 2 +- .../qwp/client/PrReviewRedTestsE2e.java | 2 +- .../sf/cursor/CloseOwnershipRaceTest.java | 2 +- ...CursorWebSocketSendLoopErrorLatchTest.java | 26 +++++----- 6 files changed, 48 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index beec7512..545b848a 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -2563,7 +2563,7 @@ private void checkConnectionError() { error.fillInStackTrace(); throw error; } - // Poll the cursor I/O loop's lastError too. Without this, a fatal + // Poll the cursor I/O loop's terminalError too. Without this, a fatal // wire / server-rejection error recorded by the I/O thread would // only surface on the next flush() / close() — every row-level // method (table, longColumn, atNow, etc.) routes through diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 1f33ed4c..77db27c8 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -71,7 +71,7 @@ * into the engine; this thread reads. {@code engine.publishedFsn()} is * the volatile publish barrier. *

- * Errors are reported via {@link #getLastError()}; the I/O thread sets it + * Errors are reported via {@link #getTerminalError()}; the I/O thread sets it * and exits. Producers polling {@link #checkError()} surface the failure. */ public final class CursorWebSocketSendLoop implements QuietCloseable { @@ -200,10 +200,14 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { private volatile boolean hasEverConnected; private volatile Thread ioThread; // The latched terminal failure — THE exception every checkError() call - // rethrows. Non-LineSenderException causes are wrapped once at latch time - // (recordFatal), so rethrows always deliver the same instance and close() - // can suppress double-signals by identity. - private volatile LineSenderException lastError; + // rethrows. Write-once for the loop's lifetime: the only writer is + // recordFatal on the I/O thread (first-writer-wins). The whole + // close()-ownership protocol rests on that — the identity comparisons + // in hasUnsurfacedError() and in close()'s suppression are only + // meaningful because the latched instance never changes. + // Non-LineSenderException causes are wrapped once at latch time, so + // rethrows always deliver the same instance. + private volatile LineSenderException terminalError; // Wall clock of the last outbound activity on the wire -- a sent frame // (trySendOne) or a keepalive PING (sendDurableAckKeepaliveIfDue). // Throttles the durable-ack keepalive PING so it fires only when the @@ -228,7 +232,7 @@ public final class CursorWebSocketSendLoop implements QuietCloseable { // at engine.activeSegment(); advances to newer sealed segments / the new // active as the producer rotates. private MmapSegment sendingSegment; - // Exact lastError instance that checkError() has thrown to a synchronous + // Exact terminalError instance that checkError() has thrown to a synchronous // user-thread caller (flush/append/close). close() uses the instance so it // only suppresses errors the user already owned before close() began. private volatile LineSenderException synchronouslySurfacedError; @@ -476,7 +480,7 @@ public static boolean isTerminalCloseCode(int code) { * an error is set the loop has already exited. */ public void checkError() { - LineSenderException e = lastError; + LineSenderException e = terminalError; if (e != null) { synchronouslySurfacedError = e; throw e; @@ -544,10 +548,6 @@ public synchronized void close() { } } - public Throwable getLastError() { - return lastError; - } - /** * Typed server-rejection payload of the latched terminal error, or * {@code null} when the loop latched a wire-level failure (or nothing). @@ -555,7 +555,7 @@ public Throwable getLastError() { * as a {@link LineSenderServerException} carrying its {@link SenderError}. */ public SenderError getLastTerminalServerError() { - LineSenderException e = lastError; + LineSenderException e = terminalError; return e instanceof LineSenderServerException ? ((LineSenderServerException) e).getServerError() : null; } @@ -568,7 +568,7 @@ public SenderError getLastTerminalServerError() { * This is the single read {@code QwpWebSocketSender.close()} uses to learn * which terminal error the user already owns. The ownership decision must * be taken from this one field only: deriving it from two separate latch - * reads (e.g. an unsurfaced-check followed by {@link #getLastError()}) + * reads (e.g. an unsurfaced-check followed by {@link #getTerminalError()}) * races the I/O thread — a terminal latched between the reads gets * mis-captured as already-owned and is then silently dropped on close(). * Guarded by {@code CloseOwnershipRaceTest}. @@ -577,6 +577,14 @@ public Throwable getSynchronouslySurfacedError() { return synchronouslySurfacedError; } + /** + * The latched terminal failure, or {@code null} while the loop is + * healthy. Read-only — does not mark the error as surfaced. + */ + public Throwable getTerminalError() { + return terminalError; + } + public long getTotalAcks() { return totalAcks.get(); } @@ -1072,14 +1080,14 @@ private void fail(Throwable initial) { } /** - * True when {@link #lastError} is set AND no synchronous user-thread + * True when {@link #terminalError} is set AND no synchronous user-thread * caller has yet seen that same instance via {@link #checkError()}. * The {@link #checkUnsurfacedError()} safety net composes this with - * checkError(); reads {@code lastError} once so the comparison cannot + * checkError(); reads {@code terminalError} once so the comparison cannot * tear against a concurrent latch. */ private boolean hasUnsurfacedError() { - Throwable e = lastError; + Throwable e = terminalError; return e != null && synchronouslySurfacedError != e; } @@ -1186,13 +1194,17 @@ private void positionCursorInSegment(MmapSegment seg, long targetFsn) { * Mark the loop as fatally failed. Caller has decided no reconnect * is possible (or it ran out of budget) — latch the error so * {@link #checkError} can surface it to the producer thread, then - * stop the loop. Idempotent — only the first failure latches. + * stop the loop. First-writer-wins: only the first failure latches. + * The check-then-latch is unsynchronized and is safe ONLY because + * every caller runs on the I/O thread (connectLoop and the + * receive-path rejection handlers are all pumped by ioLoop); calling + * this from any other thread would be a lost-update race. * Non-{@link LineSenderException} causes are wrapped once here, so * every rethrow delivers the same instance. */ private void recordFatal(Throwable t) { - if (lastError == null) { - lastError = t instanceof LineSenderException + if (terminalError == null) { + terminalError = t instanceof LineSenderException ? (LineSenderException) t : new LineSenderException("I/O thread failed: " + t.getMessage(), t); } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java index 3300ae86..2f5a30a8 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/IoThreadErrorSurfacedOnRowApiTest.java @@ -80,7 +80,7 @@ public void testRowApiMethodSurfacesIoThreadTerminalError() throws Exception { sender.flush(); // Wait for the I/O thread to record the error. After this, - // cursorSendLoop.lastError is populated and the loop has + // cursorSendLoop.terminalError is populated and the loop has // exited. QwpWebSocketSender wss = (QwpWebSocketSender) sender; long deadline = System.currentTimeMillis() + 3_000L; diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java index e3f80e96..669aaf23 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/PrReviewRedTestsE2e.java @@ -66,7 +66,7 @@ public class PrReviewRedTestsE2e { *

* Concrete consequence the spec calls out: a user-supplied error handler * that synchronously calls {@code sender.flush()} from inside - * {@code onError} can observe {@code lastError == null} and pass — + * {@code onError} can observe {@code terminalError == null} and pass — * landing post-HALT bytes in the engine. *

* This test asserts the spec invariant directly: by the time the diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java index 5f8a0e8d..f4cbffd1 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CloseOwnershipRaceTest.java @@ -78,7 +78,7 @@ public void closeOwnershipSnapshotNeverClaimsAnUnsurfacedError() { // code it reads a field nobody here writes — it cannot flake; // a reintroduced two-read snapshot tears when the latch lands // between its reads, with near-certainty across all rounds. - while (leaked == null && loop.getLastError() == null) { + while (leaked == null && loop.getTerminalError() == null) { leaked = loop.getSynchronouslySurfacedError(); } loop.close(); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java index d238da42..80c23ac2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoopErrorLatchTest.java @@ -37,7 +37,7 @@ /** * Pinpointed tests for the latched-error contract on {@link CursorWebSocketSendLoop}: - * {@code recordFatal} → {@link CursorWebSocketSendLoop#getLastError} + + * {@code recordFatal} → {@link CursorWebSocketSendLoop#getTerminalError} + * {@link CursorWebSocketSendLoop#getLastTerminalServerError} + * {@link CursorWebSocketSendLoop#checkError}. Bypasses the constructor entirely * via {@code Unsafe.allocateInstance} to avoid the live wire/engine dependencies @@ -53,7 +53,7 @@ public void testCheckErrorRethrowsLineSenderException() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); SenderError err = newSenderError(); LineSenderServerException original = new LineSenderServerException(err); - setField(loop, "lastError", original); + setField(loop, "terminalError", original); Assert.assertNull(loop.getSynchronouslySurfacedError()); try { @@ -94,7 +94,7 @@ public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { first = thrown; } Assert.assertSame("the latch must hold the wrapper, not the raw cause", - first, loop.getLastError()); + first, loop.getTerminalError()); Assert.assertSame("ownership tracks the latched wrapper", first, loop.getSynchronouslySurfacedError()); loop.checkUnsurfacedError(); // owned -> silent @@ -111,7 +111,7 @@ public void testRecordFatalWrapsNonLineSenderThrowableOnce() throws Exception { @Test public void testCheckErrorIsNoopWhenNoLatch() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); - Assert.assertNull(loop.getLastError()); + Assert.assertNull(loop.getTerminalError()); loop.checkError(); // must not throw } @@ -124,7 +124,7 @@ public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception loop.checkUnsurfacedError(); // no latch -> silent LineSenderException e = new LineSenderException("wire fail"); - setField(loop, "lastError", e); + setField(loop, "terminalError", e); try { loop.checkUnsurfacedError(); Assert.fail("an unowned latch must rethrow from the safety net"); @@ -143,18 +143,18 @@ public void testCheckUnsurfacedErrorThrowsOnceThenStaysSilent() throws Exception } @Test - public void testGetLastErrorReturnsLatchedThrowable() throws Exception { + public void testGetTerminalErrorReturnsLatchedThrowable() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Throwable e = new LineSenderException("boom"); - setField(loop, "lastError", e); - Assert.assertSame(e, loop.getLastError()); + setField(loop, "terminalError", e); + Assert.assertSame(e, loop.getTerminalError()); } @Test - public void testGetLastErrorIsNullBeforeAnyFailure() throws Exception { + public void testGetTerminalErrorIsNullBeforeAnyFailure() throws Exception { CursorWebSocketSendLoop loop = newBareLoop(); Assert.assertNull("loops with no latched error must report null", - loop.getLastError()); + loop.getTerminalError()); } @Test @@ -166,7 +166,7 @@ public void testRecordFatalLatchesThrowableOnly() throws Exception { invokeRecordFatal(loop, e); - Assert.assertSame(e, loop.getLastError()); + Assert.assertSame(e, loop.getTerminalError()); Assert.assertNull("typed payload must be null for a wire-level fatal", loop.getLastTerminalServerError()); Assert.assertFalse("recordFatal must stop the loop", @@ -182,7 +182,7 @@ public void testRecordFatalLatchesBothThrowableAndSenderError() throws Exception invokeRecordFatal(loop, ex); - Assert.assertSame(ex, loop.getLastError()); + Assert.assertSame(ex, loop.getTerminalError()); Assert.assertSame("typed payload is derived from the latched LineSenderServerException", err, loop.getLastTerminalServerError()); Assert.assertFalse((Boolean) getField(loop, "running")); @@ -204,7 +204,7 @@ public void testRecordFatalIsIdempotent() throws Exception { // overwrite, otherwise a follow-on cascade would mask the original // root cause. Assert.assertSame("first throwable must remain latched", - first, loop.getLastError()); + first, loop.getTerminalError()); Assert.assertSame("first SenderError must remain latched", firstErr, loop.getLastTerminalServerError()); } From 92f9bfa2c3f93ded0b70a7c624ced4330fc8e997 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 15 Jun 2026 13:10:24 +0100 Subject: [PATCH 07/10] test(qwp): red test for M3 drainOnClose() double-signal Pin the pre-existing close() double-signal: when an async custom error handler has already received the terminal, close_flush_timeout_millis > 0, and an unacked tail remains, close() -> drainOnClose() -> checkError() (QwpWebSocketSender.java:2657) re-throws the same terminal the handler already owns. The alreadyDeliveredToCustomHandler guard wraps only the step-2 checkUnsurfacedError() safety net, not the step-3 drain; the end-of-close self-suppression (terminalError == alreadyOwnedByUser) only covers SYNCHRONOUS ownership (getSynchronouslySurfacedError()), so the async-handler-only case is not suppressed. Contradicts the in-code comment: 'once the user has owned an error, close() should not double-signal it.' The server fixture gates the HALT rejection until after flush() returns so the error reaches the user only via the async handler (no synchronous surface), making the failure deterministic. Sibling green test CloseSafetyNetTest.testCloseStaysSilentWhenCustomHandlerAlreadyDelivered passes only because it uses close_flush_timeout_millis=0. Currently RED; turns green once the drain stops re-surfacing a terminal the custom handler already owns. Not introduced by this PR (out-of-diff). --- .../client/CloseDrainDoubleSignalTest.java | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java new file mode 100644 index 00000000..ad1397f4 --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java @@ -0,0 +1,228 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.LineSenderServerException; +import io.questdb.client.Sender; +import io.questdb.client.SenderError; +import io.questdb.client.SenderErrorHandler; +import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import io.questdb.client.cutlass.qwp.client.WebSocketResponse; +import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * RED regression test for finding M3 (close-error-race review): a + * pre-existing {@code drainOnClose()} double-signal. + *

+ * When ALL of the following hold: + *

    + *
  • an async custom error handler has already received the terminal + * (so {@code SenderErrorDispatcher.hasDeliveredToCustomHandler()} is + * true), and
  • + *
  • {@code close_flush_timeout_millis > 0} (so the bounded drain runs), + * and
  • + *
  • an unacked tail remains ({@code ackedFsn < publishedFsn} — the + * natural state after a HALT rejection, which never advances + * {@code ackedFsn}),
  • + *
+ * then {@code QwpWebSocketSender.close()} re-throws the same terminal error + * the user's handler already owns. + *

+ * Root cause: in {@code close()} the {@code alreadyDeliveredToCustomHandler} + * guard wraps ONLY the step-2 {@code checkUnsurfacedError()} safety net, not + * the step-3 {@code drainOnClose()}. The drain's loop calls + * {@code cursorSendLoop.checkError()} (QwpWebSocketSender.java:2657) + * unconditionally, which re-throws the latched terminal. The end-of-close + * self-suppression ({@code terminalError == alreadyOwnedByUser}) cannot catch + * this because {@code alreadyOwnedByUser} is sourced from + * {@code getSynchronouslySurfacedError()} — it tracks SYNCHRONOUS ownership + * (a producer-thread {@code flush()}/{@code at()} that caught the error) + * only, and is {@code null} when the error reached the user purely through + * the async handler. + *

+ * This contradicts the in-code comment's stated intent: "once the user has + * owned an error, close() should not double-signal it." + *

+ * The sibling green test + * {@link CloseSafetyNetTest#testCloseStaysSilentWhenCustomHandlerAlreadyDelivered()} + * passes only because it uses {@code close_flush_timeout_millis=0}, which + * makes {@code drainOnClose()} return at its first guard so the buggy + * {@code checkError()} is never reached. This test flips that one knob on and + * adds the unacked tail. + *

+ * Determinism: the server fixture holds the HALT rejection behind a gate that + * the test releases only AFTER {@code flush()} has returned. Without the gate, + * the rejection could latch before {@code flush()}'s own + * {@code checkError()} runs, surfacing the error synchronously, populating + * {@code synchronouslySurfacedError}, and masking the double-signal via the + * identity self-suppression (flaky green). Gating guarantees the error + * reaches the user ONLY through the async handler — the exact precondition + * the finding describes. + */ +public class CloseDrainDoubleSignalTest { + + @Test(timeout = 30_000) + public void testCloseDoesNotDoubleSignalWhenAsyncHandlerOwnsErrorAndDrainRuns() throws Exception { + int port = TestPorts.findUnusedPort(); + GatedHaltHandler server = new GatedHaltHandler(); + try (TestWebSocketServer ws = new TestWebSocketServer(port, server)) { + ws.start(); + Assert.assertTrue(ws.awaitStart(5, TimeUnit.SECONDS)); + + // Memory mode + a positive drain timeout: drainOnClose() WILL run. + String cfg = "ws::addr=localhost:" + port + + ";close_flush_timeout_millis=2000;"; + + ErrorInbox inbox = new ErrorInbox(); + Sender sender = Sender.builder(cfg).errorHandler(inbox).build(); + try { + // Publish exactly one batch: publishedFsn -> 1. flush() runs + // its own checkError() here while the server is still holding + // the rejection behind the gate, so it returns cleanly and + // nothing is surfaced synchronously. + sender.table("foo").longColumn("v", 1L).atNow(); + sender.flush(); + + // Now let the server emit the HALT rejection. It latches the + // terminal (recordFatal) and dispatches it to the async + // handler (dispatchError). ackedFsn stays at 0 (HALT never + // advances the watermark) -> the unacked tail precondition. + server.releaseRejection(); + + Assert.assertTrue( + "precondition: HALT terminal must reach the async custom handler within 10s", + inbox.await(10, TimeUnit.SECONDS)); + + SenderError delivered = inbox.get(); + Assert.assertNotNull("precondition: handler received a SenderError", delivered); + Assert.assertEquals( + "precondition: server status 0x05 must map to PARSE_ERROR", + SenderError.Category.PARSE_ERROR, delivered.getCategory()); + Assert.assertEquals( + "precondition: PARSE_ERROR is a HALT-policy rejection", + SenderError.Policy.HALT, delivered.getAppliedPolicy()); + + // Sanity-check the remaining preconditions are actually live + // before we exercise close(): the terminal is latched and an + // unacked tail remains. + QwpWebSocketSender wss = (QwpWebSocketSender) sender; + Assert.assertNotNull( + "precondition: I/O loop latched the terminal server error", + wss.getLastTerminalError()); + + // The async handler now OWNS the error. Per the in-code + // contract, close() must NOT double-signal it. Today close() + // -> drainOnClose() -> checkError() re-throws it. + try { + sender.close(); + } catch (LineSenderException e) { + Assert.fail( + "M3: close() double-signalled a terminal error the async custom " + + "handler already owns. drainOnClose() (QwpWebSocketSender.java:2657) " + + "re-threw from close(): " + + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } finally { + // Idempotent: if the assertion above already drove close() to + // completion (it runs all cleanup before re-throwing), this is + // a no-op. Guarantees teardown if a precondition assert threw + // before we reached close(). + server.releaseRejection(); + sender.close(); + } + } + } + + /** + * Server fixture that responds to the first binary frame with a + * {@code STATUS_PARSE_ERROR} (HALT-policy) rejection, but only once the + * test releases the gate. Blocking inside {@code onBinaryMessage} mirrors + * the established {@code DelayingAckHandler} pattern in {@link CloseDrainTest}. + */ + private static final class GatedHaltHandler implements TestWebSocketServer.WebSocketServerHandler { + private final CountDownLatch gate = new CountDownLatch(1); + private final AtomicLong nextSeq = new AtomicLong(); + + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + try { + gate.await(); + client.sendBinary(buildErrorAck(nextSeq.getAndIncrement(), + WebSocketResponse.STATUS_PARSE_ERROR, "test: parse error (HALT)")); + } catch (IOException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + void releaseRejection() { + gate.countDown(); + } + } + + // Mirrors WebSocketResponse error layout: status u8 | seq u64 LE | msgLen u16 LE | msg UTF-8 + private static byte[] buildErrorAck(long seq, byte status, String msg) { + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + byte[] buf = new byte[1 + 8 + 2 + msgBytes.length]; + ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN); + bb.put(status); + bb.putLong(seq); + bb.putShort((short) msgBytes.length); + bb.put(msgBytes); + return buf; + } + + private static final class ErrorInbox implements SenderErrorHandler { + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference ref = new AtomicReference<>(); + + boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + SenderError get() { + return ref.get(); + } + + @Override + public void onError(SenderError err) { + if (ref.compareAndSet(null, err)) { + latch.countDown(); + } + } + } +} From 6ebaaf21ff57e3e4170099ac76fa955e10599d02 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 15 Jun 2026 13:35:06 +0100 Subject: [PATCH 08/10] fix(qwp): stop drainOnClose() double-signalling a handler-owned terminal (M3) drainOnClose()'s loop called cursorSendLoop.checkError() unconditionally, which re-throws the latched terminal from close(). When an async custom error handler had already received the terminal, close_flush_timeout_millis > 0, and an unacked tail remained, close() re-surfaced an error the user already owned -- contradicting the in-code intent 'once the user has owned an error, close() should not double-signal it'. The checkError() call conflated two jobs: (1) breaking the drain loop when a terminal means acks will never reach target, and (2) surfacing the error. Job 1 is always needed; job 2 is a policy close() already gates. Fix: drainOnClose(boolean errorOwnedByCustomHandler) mirrors the step-2 safety-net gate. When the handler owns the terminal it stops silently on a latched error (getTerminalError() != null) instead of throwing. Otherwise it keeps checkError() -- which both surfaces the error for a config-string-only caller and breaks the loop; a synchronously-owned instance is still suppressed by close()'s terminalError == alreadyOwnedByUser check, so that path is not double-signalled either. Flips CloseDrainDoubleSignalTest green. Full close/drain/error/reconnect/ failover/durable-ack sweep: 162 tests, 0 failures. --- .../qwp/client/QwpWebSocketSender.java | 45 ++++++++++++++++--- .../client/CloseDrainDoubleSignalTest.java | 10 ++++- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 545b848a..16bf650a 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -982,11 +982,14 @@ public void close() { // everything we just published, or until the // configured timeout elapses. closeFlushTimeoutMillis // <= 0 opts out (fast close, may lose memory-mode - // data on JVM exit). Errors still surface via the - // safety-net checkError() above and via the async - // error handler. + // data on JVM exit). Pass the same ownership flag the + // step-2 safety net used: when the custom handler + // already owns the terminal, the drain must stop on it + // without re-throwing (re-throwing would double-signal + // an error the user already handled). Otherwise the + // drain keeps the loud safety net and surfaces it. if (closeFlushTimeoutMillis > 0L) { - drainOnClose(); + drainOnClose(alreadyDeliveredToCustomHandler); } } } catch (Throwable t) { @@ -2643,8 +2646,30 @@ private void dispatchConnectionEvent( * SF-mode users can recover the unacked tail by reopening a sender on * the same SF directory; memory-mode users have no recovery path and * must treat this as fatal. + *

+ * A latched terminal error means the server will never ACK up to + * {@code target}, so the drain must stop on it regardless. Whether it is + * also re-thrown from close() is a separate surfacing policy that mirrors + * the step-2 safety net in {@link #close()}: + *

    + *
  • {@code errorOwnedByCustomHandler == true}: a custom error handler + * has already delivered this terminal to the user, so stop silently — + * re-throwing here would double-signal it (the M3 drainOnClose + * double-signal).
  • + *
  • {@code errorOwnedByCustomHandler == false}: re-throw via + * {@code checkError()} to preserve the loud safety net (a + * config-string-only caller's only channel). The throw also breaks the + * loop; an error a synchronous {@code flush()}/{@code at()} caller + * already owns is then suppressed by close()'s + * {@code terminalError == alreadyOwnedByUser} check, so it is not + * double-signalled either.
  • + *
+ * + * @param errorOwnedByCustomHandler whether the async dispatcher has + * already delivered a terminal to a + * user-installed handler */ - private void drainOnClose() { + private void drainOnClose(boolean errorOwnedByCustomHandler) { if (closeFlushTimeoutMillis <= 0L) { return; } @@ -2654,7 +2679,15 @@ private void drainOnClose() { } long deadlineNanos = System.nanoTime() + closeFlushTimeoutMillis * 1_000_000L; while (cursorEngine.ackedFsn() < target) { - cursorSendLoop.checkError(); + // Stop on a latched terminal (acks will never reach target); + // surface it only when no other channel already delivered it. + if (errorOwnedByCustomHandler) { + if (cursorSendLoop.getTerminalError() != null) { + return; + } + } else { + cursorSendLoop.checkError(); + } if (System.nanoTime() >= deadlineNanos) { long acked = cursorEngine.ackedFsn(); LOG.warn("close() drain timed out after {}ms [target={} acked={}], pending data may be lost", diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java index ad1397f4..71ba0720 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainDoubleSignalTest.java @@ -45,8 +45,10 @@ import java.util.concurrent.atomic.AtomicReference; /** - * RED regression test for finding M3 (close-error-race review): a - * pre-existing {@code drainOnClose()} double-signal. + * Regression test for finding M3 (close-error-race review): a + * pre-existing {@code drainOnClose()} double-signal. Written RED, now green + * after the drain stopped re-surfacing a terminal a custom handler already + * owns; guards against the double-signal regressing. *

* When ALL of the following hold: *

    @@ -84,6 +86,10 @@ * {@code checkError()} is never reached. This test flips that one knob on and * adds the unacked tail. *

    + * Fix: {@code drainOnClose(boolean)} still stops on a latched terminal (acks + * will never reach target) but re-throws only when no custom handler owns the + * error, mirroring the step-2 safety-net gate. + *

    * Determinism: the server fixture holds the HALT rejection behind a gate that * the test releases only AFTER {@code flush()} has returned. Without the gate, * the rejection could latch before {@code flush()}'s own From 28bd5d7621feae482f9bae0914bc87e531b2d781 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 15 Jun 2026 13:45:30 +0100 Subject: [PATCH 09/10] chore: add review-pr skill under .pi --- .pi/skills/review-pr/SKILL.md | 339 ++++++++++++++++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 .pi/skills/review-pr/SKILL.md diff --git a/.pi/skills/review-pr/SKILL.md b/.pi/skills/review-pr/SKILL.md new file mode 100644 index 00000000..312cd599 --- /dev/null +++ b/.pi/skills/review-pr/SKILL.md @@ -0,0 +1,339 @@ +--- +name: review-pr +description: Review a GitHub pull request against QuestDB coding standards. Use when asked to review a PR (by number or URL), optionally with a depth level 0..3. Performs an adversarial, blocking, mission-critical code review covering correctness, concurrency, performance, resource management, tests, and QuestDB conventions, then verifies every finding against source before reporting. +allowed-tools: bash read subagent +--- + +# Review PR + +Review the pull request identified by the user's arguments. The arguments (a PR number or URL, optionally with a level token) are appended to this skill as `User: `, or appear in the user's request. Treat that text as `$ARGUMENTS` below. + +## Tooling note (pi) + +This skill was ported from a Claude Code skill. Map the tools as follows: +- Reading files → the `read` tool. +- Searching the repo (the Claude `Grep`/`Glob` steps) → the `bash` tool with `rg`, `grep`, `find`, `ls`. Every "use Grep/Glob" instruction below means "run a real `rg`/`find` search and show it" — do not reason about callsites from memory. +- `gh` commands → the `bash` tool. +- Spawning parallel review "agents" → the `subagent` tool. Launch builtin `reviewer` agents with `context: "fresh"` so each works adversarially from the repo and diff directly, not from this conversation. Example: `subagent({ tasks: [ { agent: "reviewer", task: "..." }, ... ], context: "fresh", concurrency: N })`. Verification passes (Step 3b) are likewise `reviewer` agents launched in parallel where findings are independent. The parent session stays the single decision-maker and writes the final report; reviewers are review-only (do not edit project files). + +## Review mindset + +You are a senior QuestDB engineer performing a blocking code review. QuestDB is mission-critical software deployed on spacecraft — bugs can cause data loss or system failures that cannot be patched after deployment. There is zero tolerance for correctness issues, resource leaks, or undefined behavior. Be critical, thorough, and opinionated. Your job is to catch problems before they ship, not to be nice. + +- **Assume nothing is correct until you've verified it.** Read surrounding code to understand context — don't just look at the diff in isolation. +- **The diff is a hint, not the boundary of the review.** The highest-value bugs almost always live at callsites outside the diff that depend on contracts the diff quietly changed. Treat the diff as the entry point, not the scope. +- **Flag every issue you find**, no matter how small. Do not soften language or hedge. Say "this is wrong" not "this might be an issue". +- **Do not praise the code.** Skip "looks good", "nice work", "clever approach". Focus entirely on problems and risks. +- **Think adversarially.** For each change, ask: what inputs break this? What happens under concurrent access? What if this runs on a 10-billion-row table? What if the column is NULL? What if the partition is empty? +- **Check what's missing**, not just what's there. Missing tests, missing error handling, missing edge cases, missing documentation for non-obvious behavior. +- **Verify every claim.** If the PR title says "fix", verify the bug actually existed and the fix is correct. If it says "improve performance", look for benchmarks or reason about the algorithmic change — does it actually improve things, or could it regress in other cases? If it says "simplify", verify the new code is actually simpler and doesn't drop behavior. Treat the PR description as an unverified hypothesis, not a statement of fact. +- **Read the full context of changed files** when the diff alone is ambiguous. Use `read` and `bash` (rg/grep/find) to inspect the surrounding code, callers, and related tests. +- **Assess reachability before reporting.** For every potential bug, trace the actual callers and inputs. If a problem + requires physically impossible conditions (billions of columns, corrupted JNI inputs, values that no caller can + produce), it is not a real finding — drop it. Focus on bugs that real workloads can trigger, not theoretical edge + cases that exist only in the type system. +- **QuestDB runs with Java assertions enabled (`-ea`).** Assertions are a valid guard for invariants that indicate + corruption or internal bugs. Do NOT flag `assert` as insufficient — it is the preferred mechanism for conditions + that should never occur in a non-corrupt database. Only flag an `assert` if the condition can plausibly be triggered + by normal (non-corrupt) user operations. + +## Review level + +Parse `$ARGUMENTS` for a level token: `--level=N`, `-lN`, or a bare single digit `0`-`3`. **If no level is given, default to 0.** Strip the level token before feeding the remainder (PR number or URL) to `gh` commands. + +The level controls how much of the review below actually runs. Lower levels keep the same review *spirit* — adversarial, blocking, no praise — but cut the breadth of the analysis. Higher levels have significantly higher token cost; reserve level 3 for high-stakes PRs (replication, JNI boundary changes, on-disk format, public API, security/ACL). + +| Level | What runs | +|-------|-----------| +| **0 (default)** | Steps 1, 2, 4. Skip Step 2.5. Skip Step 3 — no subagent spawn; review the diff inline in the main loop, using `read`/`bash` searches on demand to resolve ambiguities. Skip Step 3b — verify each finding inline as you write it. Single-pass review covering correctness, NULL handling, tests, and QuestDB standards on the diff itself. | +| **1** | Adds Step 2.5a (semantic delta only — skip 2.5b/2.5c/2.5d). In Step 3, launch only reviewer 1 (correctness), reviewer 5 (tests), and reviewer 6 (code quality) in parallel. Skip all other reviewers. Skip Step 3b — verify findings inline as you draft the report. | +| **2** | Full Step 2.5, but in 2.5b restrict the callsite inventory to `public`/`protected` symbols (skip package-private and `pub(crate)`). In Step 3, launch reviewers 1-7, plus reviewer 8 if `.rs` files are present. Skip reviewer 9 (cross-context) and reviewer 10 (adversarial fresh-context). Step 3b uses a single batched verification reviewer for all findings instead of one per finding. | +| **3** | Every step below as written, all 10 reviewers, per-finding verification. The full mission-critical pass. | + +State the chosen level in one line at the start of the review so the user knows what they're getting (e.g., "Reviewing PR #1234 at level 2"). If the level was defaulted, mention that level 3 exists for full review. + +## Step 1: Gather PR context + +Capture the PR identifier in `$PR` (the part of `$ARGUMENTS` left after stripping the level token), then fetch metadata, diff, and review comments in a single bash call so `$PR` is in scope for all three `gh` invocations: + +```bash +PR='' +gh pr view "$PR" --json number,title,body,labels,state +gh pr diff "$PR" +gh pr view "$PR" --comments +``` + +## Step 2: PR title and description + +Check against CLAUDE.md conventions: +- Title follows Conventional Commits: `type(scope): description` +- Description repeats the verb (e.g., `fix(sql): fix ...` not `fix(sql): DECIMAL ...`) +- Description speaks to end-user impact, not implementation internals +- If fixing an issue, `Fixes #NNN` is at the top of the body +- Tone is level-headed and analytical, no superlatives or bold emphasis on numbers +- Labels match the PR scope (SQL, Performance, Core, etc.) + +## Step 2.5: Map the change surface + +Before launching review reviewers, produce a structured change surface map. This step is mandatory and must use real `rg`/`find` searches via `bash` — do not reason about callsites from memory. The output of this step is required input for every reviewer in Step 3. + +### 2.5a Semantic delta per changed symbol + +For every modified or added function, method, trait, struct field, SQL operator/function, or public constant, write: + +- **Symbol:** fully-qualified name +- **Before:** signature, return type, error/exception behavior, panic behavior, mutation (`&self` vs `&mut self`, `final` vs not), ordering/idempotency guarantees, allocation behavior, thread-safety +- **After:** same fields +- **Delta:** one line stating what semantically changed + +"Refactored", "cleaned up", "improved", "simplified" are not acceptable deltas. State the actual behavioral difference. If nothing semantically changed, write "no behavioral change" — but only after checking, not as a default. + +### 2.5b Callsite inventory + +For every changed symbol that is `public`, `protected`, package-private, or exported (`pub` / `pub(crate)` in Rust), run `rg` across the entire repository to find every callsite, implementation, override, or reference outside the diff. + +Produce a list grouped by file. For Java, also search for: +- subclasses that override the method +- interfaces that declare it +- reflection-based callers (`getMethod`, `getDeclaredField`, `Class.forName`) +- SQL function/operator registrations (`FunctionFactory`, `OperatorRegistry`) +- service loader entries + +For Rust, also search for: +- trait impls +- macro expansions +- JNI exports and their Java callers +- `extern "C"` boundaries + +A changed `pub`/`protected`/package-private symbol with zero recorded `rg` calls in the trace is a skill violation. The model is not allowed to assert "this is only used here" without showing the search. + +### 2.5c Implicit contract list + +For each changed symbol, walk this checklist and write one line per item, stating before vs after: + +- Panics or throws on which inputs +- Error variants returned and which `?`/`throws` chains propagate them +- Iteration order, sort stability, NULL ordering +- Idempotency and re-entrancy +- Lock acquisition order and which locks are held on return +- Allocation on hot vs compile-time path +- `Send`/`Sync`, thread-affinity, JFR/JNI thread attachment requirements +- Whether `null` and sentinel-NULL (`Numbers.LONG_NULL`, `Numbers.INT_NULL`, etc.) are still distinguished +- Cancellation/drop behavior (Rust) and finally/close behavior (Java) +- SQL: does the symbol now appear in new clauses (WHERE, GROUP BY, JOIN ON, ORDER BY, window frames, partition predicates, materialized view definitions) where it didn't before? List which. + +### 2.5d Cross-context exposure list + +End this step with an explicit list of "places this change is visible from but the diff does not touch". This is the highest-priority input for the bug-hunting reviewers in Step 3. + +The list groups the callsites from 2.5b by execution context: hot data paths, SQL compilation, async runtime, JNI boundary, replication, materialized views, parallel execution workers, etc. Every entry on this list must be reviewed in Step 3. + +## Step 3: Parallel review + +Launch the reviewers below with the `subagent` tool in `context: "fresh"` mode, in parallel (`subagent({ tasks: [...], context: "fresh", concurrency: N })`). Every reviewer task must include: +1. The PR diff +2. The full change surface map from Step 2.5 (semantic deltas, callsite inventory, implicit contracts, cross-context exposure list) + +(Exception: reviewer 10 receives only the diff and changed file names — see below.) + +### Anti-anchoring directive (applies to all reviewers) + +- **Bugs at callsites outside the diff outrank bugs inside the diff.** A confirmed bug in a file the PR did not touch but that calls a changed symbol is a P0 finding. +- **"Looks correct in isolation" is not a valid conclusion.** Before clearing a changed symbol, the reviewer must walk the callsite inventory from 2.5b and explicitly state, per callsite, whether the new behavior is still correct there. +- **The diff is the entry point, not the scope.** If the change surface map shows the symbol is reachable from N other files, the review covers N+1 files. +- A single finding of the form "in `FooReader.java` the new behavior of `Bar.x()` causes Y" is worth more than five findings inside the diff. + +### Reviewers + +Launch the following reviewers in parallel. + +**Reviewer 1 — Correctness & bugs:** NULL handling, edge cases, logic errors, off-by-one, operator precedence, error paths. Cross-reference every changed symbol against its callsite inventory and verify the new behavior is correct at each callsite. + +**Reviewer 2 — Concurrency:** Race conditions, shared mutable state, missing volatile, lock ordering, thread-safety of data structures. Use the implicit contract list (lock order, thread-affinity) and check every callsite from 2.5b for violations of the new contract. + +**Reviewer 3 — Performance & allocations:** Regressions, zero-GC violations, `java.util.*` collections vs `io.questdb.std`, string creation/concatenation on hot paths, SIMD opportunities. Algorithmic complexity: for each new loop, traversal, or data structure, analyze how it scales with data size (row count, partition count, join fan-out). Flag any O(n^2) or worse patterns that could regress on large tables (1M+ rows, 1000+ partitions). Check whether new code paths are compile-time-only or data-path — compile-time allocations are acceptable, data-path allocations are not. For changed symbols now reachable from new contexts (per 2.5d), check whether any of those new contexts is a hot path. + +**Reviewer 4 — Resource management:** Leaks on all code paths (especially errors), try-with-resources, native memory, pool management. Walk every callsite from 2.5b that constructs, owns, or transfers ownership of changed types and verify cleanup on all paths. + +**Reviewer 5 — Test review & coverage:** Coverage gaps, error path tests, NULL tests, boundary conditions, regression tests, test quality, `assertMemoryLeak()` usage. Cross-reference 2.5d: every cross-context exposure should have a test that exercises the changed symbol from that context. Missing tests for cross-context callsites is a high-priority finding. + +**Reviewer 6 — Code quality & standards:** Code smell, member ordering, naming conventions, modern Java features, dead code, third-party dependencies. + +**Reviewer 7 — PR metadata & conventions:** Title format, description quality, commit messages, labels, SQL style in tests. + +**Reviewer 8 — Rust safety (only if PR contains .rs files):** Check for any code that can panic at runtime — `unwrap()`, +`expect()`, array indexing without bounds checks, `panic!()`, `unreachable!()`, `todo!()`, integer overflow in release +mode, `slice::from_raw_parts` with invalid inputs. In mission-critical software a panic in Rust code called via JNI/FFI +will abort the entire JVM process with no recovery. Every fallible operation must use `Result`/`Option` with proper +error propagation. Flag every potential panic site. + +**Reviewer 9 — Cross-context caller impact:** Walk the callsite inventory from 2.5b. For every callsite, fetch the surrounding code (the calling function plus its callers up two levels) and answer: + +- Does this caller pass inputs the new behavior handles incorrectly? +- Does this caller depend on a contract from the implicit contract list (2.5c) that the change broke? +- Is this caller in a context (WHERE clause, async runtime, JNI thread, holding lock X, error path, hot loop, parallel worker, replication path, materialized view refresh) where the new behavior misbehaves even if the inputs are valid? +- For SQL functions/operators: is the symbol now resolvable in clauses where it didn't compile before (WHERE on indexed column, JOIN ON, GROUP BY key, ORDER BY, window frame, materialized view definition), and does it actually work there end to end? +- For changed Java methods overridden by subclasses: do all overrides still satisfy the new contract? +- For changed Rust types with trait impls: do all impls still satisfy the new invariants? +- For changed JNI signatures: do all Java callers pass the right types and lifetimes? + +This reviewer's output is structured per callsite, not per failure mode. Each callsite gets a verdict: SAFE / BROKEN / NEEDS VERIFICATION. Every BROKEN entry is a P0 finding regardless of whether the file is in the diff. + +This reviewer is not optional even when the diff is small. Small diffs to widely-used symbols have the largest blast radius. + +**Reviewer 10 — Fresh-context adversarial:** Dispatched separately from reviewers 1-9 to escape checklist anchoring. This reviewer operates under different rules from the rest: + +- It receives ONLY the PR diff and the names of the changed files. It does NOT receive the change surface map from Step 2.5, the implicit contract list, the cross-context exposure list, or any of the review checklists below. +- Its sole instruction: "find ways this code is wrong". No category list, no failure-mode taxonomy, no QuestDB-specific style guide. +- It is free to use `read` and `bash` (rg/grep/find) to explore the repository however it wants. +- Findings are not pre-classified by category. Each finding states: what's wrong, why it's wrong, and the code path that demonstrates it. + +The point of this reviewer is to surface bugs the structured reviewers cannot see because they are reasoning inside the same frame. A finding here that none of reviewers 1-9 produced is high signal — it means the structured review missed it. A finding here that overlaps with reviewers 1-9 is corroboration. + +Run this reviewer in parallel with reviewers 1-9. It is mandatory regardless of diff size. + +Combine all reviewer findings into a single deduplicated **draft** report. Do NOT present this draft to the user yet — it goes straight into verification. + +## Step 3b: Verify every finding against source code + +The parallel review reviewers work from the diff plus the change surface map and frequently produce false positives — especially around memory ownership, polymorphic dispatch, Rust control-flow guarantees, and JNI lifecycle conventions. Every finding MUST be verified before it is reported. + +For each finding in the draft report: + +1. **Read the actual source code** at the exact lines cited. Do not rely on the reviewer's description alone. +2. **Trace the full code path**: follow callers, inheritance hierarchies, and runtime types. A method called on a base-class reference may dispatch to a subclass override (e.g., `PartitionDescriptor.clear()` vs `OwnedMemoryPartitionDescriptor.clear()`). +3. **Check both sides of JNI/FFI boundaries**: if a finding involves Java↔Rust interaction, read both the Java caller and the Rust JNI function. Verify ownership transfer, error propagation, and cleanup on both sides. +4. **For resource leak claims**: trace every allocation to its corresponding free/close on ALL code paths (happy path, + error path, finally blocks). Check for polymorphic `close()`/`clear()` overrides. Before claiming a leak between + allocation and cleanup registration, verify that the intervening code can actually throw. +5. **For Rust panic claims**: verify whether the panic site is actually reachable. Trace control flow backwards — a + preceding guard or early return may make it unreachable. +6. **For Rust panic claims via JNI**: trace the Java caller to check whether it can actually pass parameters that + trigger the panic. If every caller validates inputs before the JNI call, the panic is unreachable — drop it. +7. **For Rust numeric overflow claims**: check whether the overflow is reachable at realistic scale. QuestDB handles + billions to a few trillion rows, thousands of tables, and thousands of columns — not billions of columns or + quintillions of rows. If overflow requires values beyond that scale, drop it. +8. **For performance claims**: check whether the cost is measurable in a realistic scenario. Downgrade to a nit if the + saving is negligible relative to the surrounding work. Exception: GC allocations on a hot path are always worth + flagging, even a single one. +9. **For cross-context findings (Reviewer 9)**: re-read the callsite in full, including its callers up two levels, and confirm the broken behavior is reachable from production code paths. Cross-context findings are high-value but also the easiest to overstate — verify carefully. +10. **Classify each finding** as: + - **CONFIRMED in-diff** — the bug is real and inside the diff + - **CONFIRMED at out-of-diff callsite** — the bug is in an unchanged file because the changed symbol is used there in a way that's now broken (cite the file and the contract from 2.5c that was violated) + - **FALSE POSITIVE** — the code is actually correct (explain why) + - **CONFIRMED with nuance** — the issue exists but is less severe than stated (explain) + +**Move false positives to a separate "Downgraded" section** at the end of the report. For each, give a one-line explanation of why it was dismissed. This lets the PR author verify the reasoning and catch verification mistakes. + +Launch verification reviewers (fresh-context `reviewer` subagents) in parallel where findings are independent. Each verification reviewer should read surrounding source files, not just the diff. + +## Review checklists + +Review the diff for: + +### Correctness & bugs +- NULL handling: distinguish sentinel NULL vs actual NULL +- Edge cases and error paths +- SqlException positions point at the offending character, not the expression start +- Logic errors, off-by-one, incorrect bounds, wrong operator precedence +- **Reachability expansion:** for each changed symbol, list the SQL clauses, async contexts, error paths, parallel workers, and lock-held states it can now appear in but didn't before. Verify it works in each. + +### Concurrency +- Race conditions: unsynchronized shared mutable state, missing volatile, unsafe publication +- Lock ordering issues that could cause deadlocks +- Thread-safety of data structures used across threads +- For every changed symbol, check whether it is now called from a thread or context (per 2.5d) where the previous concurrency assumptions don't hold + +### Performance +- Performance regressions: changes that make hot paths slower or increase complexity +- Unnecessary allocations on data paths (zero-GC requirement) +- Use of `java.util.*` collections (HashMap, ArrayList, etc.) instead of QuestDB's own zero-GC collections in `io.questdb.std` +- String creation or concatenation on hot paths (use CharSink, StringSink, or direct char[] instead) +- Capturing lambdas on hot paths — lambdas that capture local variables or instance fields allocate a new object on every invocation. Non-capturing lambdas (static method refs, no closed-over state) are safe as the JVM caches them. Flag any capturing lambda on a data path. +- Autoboxing on hot paths — primitive-to-wrapper conversions (`int` → `Integer`, `long` → `Long`, etc.) allocate silently. Watch for primitives passed to generic methods, stored in `java.util.*` collections, or returned from methods with wrapper return types. +- Missing SIMD or vectorization opportunities +- Inefficient algorithms where QuestDB already provides optimized alternatives +- Algorithmic complexity at scale: for each new loop or traversal, what is the time complexity as a function of row count, partition count, or join fan-out? Flag O(n^2) or worse patterns. Consider: what happens with 1M outer rows? 10K partitions? 100-way fan-out per row? +- Compile-time vs data-path distinction: allocations and O(n) scans during SQL compilation/optimization are acceptable; the same on per-row data paths are not + +### Code quality +- Code smell: overly complex methods, deep nesting, unclear intent, dead code +- No third-party Java dependencies on data paths + +### QuestDB coding standards +- Class members grouped by kind (static vs instance) and visibility, sorted alphabetically +- Boolean names use `is...` / `has...` prefix +- This module (`questdb-client`) targets Java 11 — only legacy Java features are available. Flag uses of enhanced switch, multiline strings (text blocks), or pattern variables in `instanceof`, since they will not compile here. + +### Resource management +- Resources properly closed in all code paths (especially error paths) +- try-with-resources used where applicable +- Native memory freed correctly + +### SQL conventions (if tests or SQL involved) +- Keywords in UPPERCASE +- `expr::TYPE` cast syntax preferred over CAST() +- Underscores in numbers >= 5 digits (e.g., 1_000_000) +- Multiline strings for complex queries +- No DELETE statements (suggest DROP PARTITION or soft delete) +- Tests use `assertMemoryLeak()`, `assertQueryNoLeakCheck()`, `execute()` for DDL +- Single INSERT for multiple rows + +### Enterprise permissions & ACL (if PR introduces new SQL statements or ALTER operations) +- New ALTER TABLE operations almost always require a new enterprise permission. If the PR adds a new ALTER statement (or any new SQL statement that modifies state), flag it if there is no corresponding `SecurityContext.authorize*()` call in the execution path. +- New features in OSS should have an enterprise counterpart that wires up ACL. Check whether the PR introduces `authorize*` methods in `SecurityContext` and whether all enterprise `SecurityContext` implementations (`EntSecurityContextBase`, `AdminSecurityContext`, `AbstractReplicaSecurityContext`, and test mocks) are updated. +- New permissions must be registered in `Permission.java` (constant, name maps, and included in `TABLE_PERMISSIONS`/`ALL_PERMISSIONS` as appropriate). +- The `PermissionParser` must be able to parse GRANT/REVOKE for the new permission name — especially if the name contains SQL keywords like `ON`, `TO`, or `FROM` that could conflict with parser grammar. +- Replica security contexts must deny new write operations (`deniedOnReplica()`). + +### Test review +- **Coverage gaps:** For every new or changed code path, verify a corresponding test exists. If not, flag it explicitly as "missing test for X". +- **Cross-context coverage:** For every entry in the cross-context exposure list (2.5d), verify a test exercises the changed symbol from that context. Missing cross-context tests are high-priority findings. +- **Error path coverage:** Are failure cases, exceptions, and edge conditions tested — not just the happy path? +- **NULL tests:** Are NULL inputs, NULL columns, and NULL expression results tested? +- **Boundary conditions:** Empty tables, empty partitions, single-row tables, max-value inputs, zero-length strings. +- **Concurrency tests:** If the code touches shared state, are there tests that exercise concurrent access? +- **Resource leak tests:** Tests must use `assertMemoryLeak()` for anything that allocates native memory. +- **Test quality:** Are tests actually asserting the right thing? Watch for tests that pass trivially, assert on wrong values, or test implementation details instead of behavior. +- **Regression tests:** If this PR fixes a bug, is there a test that reproduces the original bug and would fail without the fix? +- Use `read`/`bash` (rg/find) to find existing test files for the changed classes and verify they cover the new behavior. + +### Unresolved TODOs and FIXMEs +- Scan the diff for `TODO`, `FIXME`, `HACK`, `XXX`, and `WORKAROUND` comments. For each one found: + - Is it a pre-existing comment that was just moved/reformatted, or newly introduced in this PR? + - If newly introduced: does it represent unfinished work that should block the merge, or a known limitation that is acceptable to ship? Flag any that look like deferred bugs or incomplete implementations. + - If the TODO references a ticket/issue number, verify the reference exists. + +### Commit messages +- Plain English titles (no Conventional Commits prefix), under 50 chars +- Full long-form body description, line breaks at 72 chars +- Active voice, naming the acting subject + +## Step 4: Output + +Present ONLY verified findings (false positives are excluded). Structure as: + +### Critical +Issues that must be fixed before merge. Each must include: +- Exact file path and line numbers (including out-of-diff files) +- Whether the finding is **in-diff** or **out-of-diff** +- Code path trace showing why the bug is real +- For out-of-diff findings: the contract from 2.5c that was violated and the callsite that triggers it +- Suggested fix + +### Moderate +Issues worth addressing but not blocking. + +### Minor +Style nits and suggestions. + +### Downgraded (false positives) +Findings from the initial review that were dismissed after source code verification. For each, state: +- The original claim (one line) +- Why it was dismissed (one line, citing the specific code that disproves it) + +### Summary +- One-line verdict: approve, request changes, or needs discussion +- Highlight any regressions or tradeoffs +- State how many draft findings were verified vs dropped as false positives (e.g., "8 findings verified, 4 false positives removed") +- State the in-diff vs out-of-diff split (e.g., "5 findings in-diff, 3 findings out-of-diff"). If the diff is non-trivial and out-of-diff is zero, the cross-context pass likely underran — re-invoke Reviewer 9 with a wider grep before finalizing. From 322bb9f3ce429c4dcbce83013ad1ec957eec5416 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 15 Jun 2026 16:30:26 +0100 Subject: [PATCH 10/10] fix(qwp): key close() terminal surfacing on terminal delivery, not "any error ever" (C1) close() gated its terminal safety net on hasDeliveredToCustomHandler() -- a lifetime-sticky "the custom handler saw ANY error, ever" flag. A routine DROP_AND_CONTINUE rejection (e.g. SCHEMA_MISMATCH / WRITE_ERROR) flipped that flag permanently, after which a later genuine HALT terminal was silently lost from every synchronous close() channel: the step-2 checkUnsurfacedError() was skipped and drainOnClose() took its silent branch. This regressed the base, whose unconditional drain checkError() was a loud backstop. Fix: track whether the dispatcher actually delivered THE latched terminal (the exact SenderError instance, identity-matched) to a non-default handler, and gate close() on that instead. - SenderErrorDispatcher: add markTerminal(SenderError) (write-once) + a terminal-specific deliveredTerminalToCustomHandler flag set only when the marked terminal instance is delivered to a custom handler; hasDeliveredTerminalToCustomHandler() accessor. Keep the old hasDeliveredToCustomHandler() as an ops/diagnostic signal. - CursorWebSocketSendLoop.recordFatal: mark the dispatcher's terminal under the write-once latch guard (the same SenderError dispatchError() delivers). - QwpWebSocketSender.close(): gate step-2 and drainOnClose() on hasDeliveredTerminalToCustomHandler(). Because the flag flips only on actual delivery of the terminal, all three loss paths now surface loudly: the DROP-then-HALT conflation, the setErrorHandler(null) revert (terminal reaches only the default handler), and the slow-handler case (terminal not yet delivered at close() time, so it is rethrown before the dispatcher's 100ms drain deadline abandons the queue). The legitimate single-HALT case still suppresses the double-signal (M3). Adds CloseTerminalConflationTest: fails on the old "any error ever" gate (reproduces C1 via the deterministic setErrorHandler(null) path), passes with the fix. --- .../qwp/client/QwpWebSocketSender.java | 43 ++-- .../sf/cursor/CursorWebSocketSendLoop.java | 12 + .../sf/cursor/SenderErrorDispatcher.java | 59 ++++- .../client/CloseTerminalConflationTest.java | 215 ++++++++++++++++++ 4 files changed, 308 insertions(+), 21 deletions(-) create mode 100644 core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseTerminalConflationTest.java diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 16bf650a..2d729a49 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -959,23 +959,28 @@ public void close() { if (activeBuffer != null && activeBuffer.hasData()) { sealAndSwapBuffer(); } - // 2) Safety-net rethrow: surface a latched terminal error - // only when no other channel has already delivered it - // to the user. "Already delivered" means either the - // producer thread saw it synchronously via - // flush()/append() (checkUnsurfacedError is silent in - // that case) or the async dispatcher delivered it to a - // user-installed custom handler at any point in this - // sender's life (deliveredToCustomHandler, checked - // here). The latter survives a setErrorHandler(null) - // cleanup in test helpers -- once the user has owned - // an error, close() should not double-signal it. The - // default no-op logging handler does not count as - // "delivered to user", so a config-string-only caller - // still gets the loud rethrow on shutdown. - boolean alreadyDeliveredToCustomHandler = errorDispatcher != null - && errorDispatcher.hasDeliveredToCustomHandler(); - if (!alreadyDeliveredToCustomHandler) { + // 2) Safety-net rethrow: surface the latched terminal + // error only when no other channel has already + // delivered THIS terminal to the user. "Already + // delivered" means either the producer thread saw it + // synchronously via flush()/append() (checkUnsurfacedError + // is silent in that case) or the async dispatcher + // actually delivered the latched terminal to a + // user-installed custom handler + // (hasDeliveredTerminalToCustomHandler, checked here). + // The test is terminal-specific on purpose: an earlier + // routine DROP_AND_CONTINUE rejection delivered to the + // handler must NOT suppress a later genuine HALT + // terminal (the "any error ever" flag did, silently + // losing it). It also stays false when the terminal + // reached only the default handler after a + // setErrorHandler(null) revert, or is still + // queued/abandoned behind a slow handler -- so a + // config-string-only caller, and a reverting caller, + // both still get the loud rethrow on shutdown. + boolean terminalOwnedByCustomHandler = errorDispatcher != null + && errorDispatcher.hasDeliveredTerminalToCustomHandler(); + if (!terminalOwnedByCustomHandler) { cursorSendLoop.checkUnsurfacedError(); } // 3) Bounded drain: block until the server has ACK'd @@ -984,12 +989,12 @@ public void close() { // <= 0 opts out (fast close, may lose memory-mode // data on JVM exit). Pass the same ownership flag the // step-2 safety net used: when the custom handler - // already owns the terminal, the drain must stop on it + // already owns THIS terminal, the drain must stop on it // without re-throwing (re-throwing would double-signal // an error the user already handled). Otherwise the // drain keeps the loud safety net and surfaces it. if (closeFlushTimeoutMillis > 0L) { - drainOnClose(alreadyDeliveredToCustomHandler); + drainOnClose(terminalOwnedByCustomHandler); } } } catch (Throwable t) { diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java index 77db27c8..2003aa08 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java @@ -1207,6 +1207,18 @@ private void recordFatal(Throwable t) { terminalError = t instanceof LineSenderException ? (LineSenderException) t : new LineSenderException("I/O thread failed: " + t.getMessage(), t); + // Tell the async dispatcher which SenderError IS the terminal, so + // close() can distinguish "the custom handler owns THIS terminal" + // from "the custom handler saw some earlier DROP_AND_CONTINUE". + // Same instance dispatchError() delivers (the err wrapped here), + // so the dispatcher's identity compare matches. Marked under the + // write-once latch guard so a stray later HALT cannot re-point it. + if (terminalError instanceof LineSenderServerException) { + SenderErrorDispatcher d = errorDispatcher; + if (d != null) { + d.markTerminal(((LineSenderServerException) terminalError).getServerError()); + } + } } running = false; if (t instanceof LineSenderServerException) { diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java index aef01dde..5d31f3d7 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java @@ -80,9 +80,17 @@ public final class SenderErrorDispatcher implements QuietCloseable { // Set the first time the dispatcher delivers an error to a non-default // handler. Stays true even if the user later swaps the handler back to // the default -- the signal is "did the user-installed handler ever see - // this stream of errors", consulted by close() to decide whether the - // safety-net rethrow is still needed. + // this stream of errors". Kept for ops/diagnostic visibility; NOT used + // by close() for the safety-net decision -- see + // deliveredTerminalToCustomHandler for why "any error ever" is too coarse. private final AtomicBoolean deliveredToCustomHandler = new AtomicBoolean(); + // Set the first time the dispatcher delivers THE terminal error (the exact + // SenderError the I/O loop latched via recordFatal, marked here through + // markTerminal) to a non-default handler. This -- not + // deliveredToCustomHandler -- is what close() consults: a routine + // DROP_AND_CONTINUE rejection delivered earlier must NOT suppress the + // close() safety net for a later, genuinely-unsurfaced HALT terminal. + private final AtomicBoolean deliveredTerminalToCustomHandler = new AtomicBoolean(); private final AtomicLong dropped = new AtomicLong(); // volatile so the user can swap the handler post-connect, mirroring // SenderProgressDispatcher. A final field would make handler config a @@ -100,6 +108,13 @@ public final class SenderErrorDispatcher implements QuietCloseable { // wins the race to spawn it. private final Object lock = new Object(); private final String threadName; + // The exact SenderError instance the I/O loop latched as terminal, set + // once via markTerminal (first-writer-wins, mirroring recordFatal's + // latch). The dispatch loop identity-compares delivered errors against + // it so only delivery of THIS terminal -- not an earlier + // DROP_AND_CONTINUE -- flips deliveredTerminalToCustomHandler. volatile: + // written on the I/O thread, read on the dispatcher thread. + private volatile SenderError terminalServerError; private final AtomicLong totalDelivered = new AtomicLong(); private volatile boolean closed; // volatile to give the off-lock read in offer() a happens-before with @@ -206,6 +221,39 @@ public boolean hasDeliveredToCustomHandler() { return deliveredToCustomHandler.get(); } + /** + * True once the dispatcher has actually delivered THE terminal error -- the + * exact {@link SenderError} the I/O loop latched and passed to + * {@link #markTerminal} -- to a user-installed (non-default) handler. + *

    + * This is the signal {@code QwpWebSocketSender.close()} uses to decide + * whether its safety-net rethrow is still needed. Unlike + * {@link #hasDeliveredToCustomHandler()} ("any error ever"), this stays + * {@code false} when the only thing the custom handler saw was an earlier + * {@code DROP_AND_CONTINUE} rejection, or when the terminal reached only + * the default handler after a {@code setErrorHandler(null)} revert, or when + * the terminal is still queued/abandoned because the handler is slow. In + * all those cases close() must still surface the terminal loudly. + */ + public boolean hasDeliveredTerminalToCustomHandler() { + return deliveredTerminalToCustomHandler.get(); + } + + /** + * Record the exact {@link SenderError} instance the I/O loop latched as its + * terminal failure, so the dispatch loop can recognise it on delivery. + * Called by {@code CursorWebSocketSendLoop.recordFatal} on the I/O thread + * before the matching {@link #offer}, so the marker is visible by the time + * the dispatcher delivers it. First-writer-wins, mirroring recordFatal's + * own write-once latch -- a stray later HALT cannot re-point the marker at + * an error that is not the latched terminal. + */ + public void markTerminal(SenderError err) { + if (terminalServerError == null) { + terminalServerError = err; + } + } + /** * Replace the user-supplied handler. Effective for the next delivery. * Null reverts to the loud-not-silent default. @@ -278,6 +326,13 @@ private void dispatchLoop() { SenderErrorHandler h = handler; if (h != DefaultSenderErrorHandler.INSTANCE) { deliveredToCustomHandler.set(true); + // Identity match: only THIS delivery of the latched terminal + // counts as the custom handler owning the terminal. An earlier + // DROP_AND_CONTINUE (err != terminalServerError) does not, so + // close() will not suppress a later genuine terminal. + if (err == terminalServerError) { + deliveredTerminalToCustomHandler.set(true); + } } try { h.onError(err); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseTerminalConflationTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseTerminalConflationTest.java new file mode 100644 index 00000000..76d2d9cf --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseTerminalConflationTest.java @@ -0,0 +1,215 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.Sender; +import io.questdb.client.SenderError; +import io.questdb.client.SenderErrorHandler; +import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import io.questdb.client.cutlass.qwp.client.WebSocketResponse; +import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Regression test for finding C1 (close-error-race review): close() + * keyed its terminal safety-net on a sticky "the custom handler saw ANY error, + * ever" flag ({@code SenderErrorDispatcher.hasDeliveredToCustomHandler()}) + * rather than "the custom handler owns THIS terminal". A routine + * {@code DROP_AND_CONTINUE} rejection set that flag permanently, after which a + * later genuine HALT terminal was silently dropped from every synchronous + * close() channel. + *

    + * This test drives the deterministic, public-API path: + *

      + *
    1. a {@code WRITE_ERROR} (DROP_AND_CONTINUE) rejection is delivered to a + * custom handler — flipping the old "any error ever" flag;
    2. + *
    3. {@code setErrorHandler(null)} reverts to the loud-not-silent default — + * which does NOT reset the sticky flag;
    4. + *
    5. a {@code PARSE_ERROR} (HALT) terminal latches with an unacked tail and + * is dispatched only to the default handler.
    6. + *
    + * The old code returned from {@code close()} silently (flag still true -> + * step-2 safety net skipped, drain took its silent branch). The fix gates on + * {@code hasDeliveredTerminalToCustomHandler()} — true only when the latched + * terminal itself reached a custom handler — so close() loudly rethrows the + * HALT the user never saw. + */ +public class CloseTerminalConflationTest { + + @Test(timeout = 30_000) + public void testCloseSurfacesHaltAfterEarlierDropFlippedTheStickyFlag() throws Exception { + int port = TestPorts.findUnusedPort(); + DropThenGatedHaltHandler server = new DropThenGatedHaltHandler(); + try (TestWebSocketServer ws = new TestWebSocketServer(port, server)) { + ws.start(); + Assert.assertTrue(ws.awaitStart(5, TimeUnit.SECONDS)); + + // Memory mode + a positive drain timeout: drainOnClose() WILL run. + String cfg = "ws::addr=localhost:" + port + + ";close_flush_timeout_millis=2000;"; + + ErrorInbox customInbox = new ErrorInbox(); + Sender sender = Sender.builder(cfg).errorHandler(customInbox).build(); + QwpWebSocketSender wss = (QwpWebSocketSender) sender; + try { + // Batch 1: server replies WRITE_ERROR (DROP_AND_CONTINUE). The + // loop drops the batch, advances the ack watermark and keeps + // running; the dispatch reaches the custom handler -> the old + // "delivered to custom handler" flag flips true. + sender.table("foo").longColumn("v", 1L).atNow(); + sender.flush(); + + Assert.assertTrue( + "precondition: DROP_AND_CONTINUE must reach the custom handler within 10s", + customInbox.await(10, TimeUnit.SECONDS)); + Assert.assertEquals( + "precondition: status 0x09 maps to WRITE_ERROR", + SenderError.Category.WRITE_ERROR, customInbox.get().getCategory()); + Assert.assertEquals( + "precondition: WRITE_ERROR defaults to DROP_AND_CONTINUE", + SenderError.Policy.DROP_AND_CONTINUE, customInbox.get().getAppliedPolicy()); + Assert.assertNull( + "precondition: a DROP must NOT latch a terminal error", + wss.getLastTerminalError()); + + // Revert to the loud-not-silent default handler. Per its + // contract this is exactly the move a caller makes to get a + // loud shutdown -- but it does not reset the sticky flag. + wss.setErrorHandler(null); + + // Batch 2: published while the HALT is gated, so flush()'s own + // checkError() runs clean and nothing is surfaced + // synchronously. publishedFsn -> 2, ackedFsn stays at 1 once + // the HALT lands -> the unacked-tail precondition for the drain. + sender.table("foo").longColumn("v", 2L).atNow(); + sender.flush(); + + // Release the HALT. It latches the terminal (recordFatal) and + // is dispatched ONLY to the default handler -- never to a + // custom handler -- so close() must surface it loudly. + server.releaseHalt(); + awaitLatchedTerminal(wss); + + try { + sender.close(); + Assert.fail("C1: close() silently dropped a HALT terminal. The custom handler " + + "only ever saw an earlier DROP_AND_CONTINUE, and the terminal went to " + + "the default handler after setErrorHandler(null); close() must rethrow it."); + } catch (LineSenderException expected) { + // Correct: the unsurfaced terminal is loud on shutdown. + } + } finally { + server.releaseHalt(); + sender.close(); + } + } + } + + private static void awaitLatchedTerminal(QwpWebSocketSender sender) { + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (sender.getLastTerminalError() == null) { + if (System.nanoTime() > deadlineNanos) { + throw new AssertionError("I/O thread did not latch a terminal within 10s"); + } + Thread.onSpinWait(); + } + } + + // Mirrors WebSocketResponse error layout: status u8 | seq u64 LE | msgLen u16 LE | msg UTF-8 + private static byte[] buildErrorAck(long seq, byte status, String msg) { + byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8); + byte[] buf = new byte[1 + 8 + 2 + msgBytes.length]; + ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN); + bb.put(status); + bb.putLong(seq); + bb.putShort((short) msgBytes.length); + bb.put(msgBytes); + return buf; + } + + /** + * First binary frame -> WRITE_ERROR (DROP_AND_CONTINUE) immediately. + * Second binary frame -> PARSE_ERROR (HALT), but only once the test + * releases the gate, so the rejection reaches the user purely through the + * shutdown path (not through flush()'s synchronous checkError()). + */ + private static final class DropThenGatedHaltHandler implements TestWebSocketServer.WebSocketServerHandler { + private final CountDownLatch haltGate = new CountDownLatch(1); + private final AtomicLong nextSeq = new AtomicLong(); + + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + try { + long seq = nextSeq.getAndIncrement(); + if (seq == 0) { + client.sendBinary(buildErrorAck(seq, + WebSocketResponse.STATUS_WRITE_ERROR, "test: write error (DROP_AND_CONTINUE)")); + } else { + haltGate.await(); + client.sendBinary(buildErrorAck(seq, + WebSocketResponse.STATUS_PARSE_ERROR, "test: parse error (HALT)")); + } + } catch (IOException | InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + void releaseHalt() { + haltGate.countDown(); + } + } + + private static final class ErrorInbox implements SenderErrorHandler { + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicReference ref = new AtomicReference<>(); + + boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + SenderError get() { + return ref.get(); + } + + @Override + public void onError(SenderError err) { + if (ref.compareAndSet(null, err)) { + latch.countDown(); + } + } + } +}