Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 339 additions & 0 deletions .pi/skills/review-pr/SKILL.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,20 @@ public void close() {
// SCHEMA_MISMATCH HALT) from users who only call close() and
// never call flush() afterwards.
Throwable terminalError = null;
// Snapshot the latched terminal error that the user thread has
// ALREADY caught (via flush()/at()) before close() ran. If
// flushPendingRows/drainOnClose below also rethrow the same
// Snapshot the exact terminal error instance that a user-thread
// API call ALREADY caught (via flush()/at()) before close() ran.
// If flushPendingRows/drainOnClose below also rethrow the same
// instance, dropping it at the final rethrow avoids
// try-with-resources self-suppression: Throwable.addSuppressed
// raises IllegalArgumentException when primary == suppressed.
Throwable alreadyOwnedByUser = (cursorSendLoop != null && !cursorSendLoop.hasUnsurfacedError())
? cursorSendLoop.getLastError() : null;
// Must stay this single read: the snapshot needs the identity of
// the error the user already owns, and only
// getSynchronouslySurfacedError() holds it. Deriving it from two
// separate latch reads races the I/O thread -- a terminal latched
// between the reads would be adopted as user-owned and silently
// dropped (see CloseOwnershipRaceTest).
Throwable alreadyOwnedByUser = cursorSendLoop != null
? cursorSendLoop.getSynchronouslySurfacedError() : null;

try {
// Only drain when both the engine and the I/O loop are wired
Expand All @@ -953,35 +959,42 @@ public void close() {
if (activeBuffer != null && activeBuffer.hasData()) {
sealAndSwapBuffer();
}
// 2) Safety-net rethrow: surface a latched terminal error
// only when no other channel has already delivered it
// to the user. "Already delivered" means either the
// producer thread saw it synchronously via
// flush()/append() (errorSurfacedSynchronously) or the
// async dispatcher delivered it to a user-installed
// custom handler at any point in this sender's life
// (deliveredToCustomHandler). The latter survives a
// setErrorHandler(null) cleanup in test helpers --
// once the user has owned an error, close() should
// not double-signal it. The default no-op logging
// handler does not count as "delivered to user", so a
// config-string-only caller still gets the loud
// rethrow on shutdown.
boolean alreadyDeliveredToCustomHandler = errorDispatcher != null
&& errorDispatcher.hasDeliveredToCustomHandler();
if (!alreadyDeliveredToCustomHandler
&& cursorSendLoop.hasUnsurfacedError()) {
cursorSendLoop.checkError();
// 2) Safety-net rethrow: surface the latched terminal
// error only when no other channel has already
// delivered THIS terminal to the user. "Already
// delivered" means either the producer thread saw it
// synchronously via flush()/append() (checkUnsurfacedError
// is silent in that case) or the async dispatcher
// actually delivered the latched terminal to a
// user-installed custom handler
// (hasDeliveredTerminalToCustomHandler, checked here).
// The test is terminal-specific on purpose: an earlier
// routine DROP_AND_CONTINUE rejection delivered to the
// handler must NOT suppress a later genuine HALT
// terminal (the "any error ever" flag did, silently
// losing it). It also stays false when the terminal
// reached only the default handler after a
// setErrorHandler(null) revert, or is still
// queued/abandoned behind a slow handler -- so a
// config-string-only caller, and a reverting caller,
// both still get the loud rethrow on shutdown.
boolean terminalOwnedByCustomHandler = errorDispatcher != null
&& errorDispatcher.hasDeliveredTerminalToCustomHandler();
if (!terminalOwnedByCustomHandler) {
cursorSendLoop.checkUnsurfacedError();
}
// 3) Bounded drain: block until the server has ACK'd
// everything we just published, or until the
// configured timeout elapses. closeFlushTimeoutMillis
// <= 0 opts out (fast close, may lose memory-mode
// data on JVM exit). Errors still surface via the
// safety-net checkError() above and via the async
// error handler.
// data on JVM exit). Pass the same ownership flag the
// step-2 safety net used: when the custom handler
// already owns THIS terminal, the drain must stop on it
// without re-throwing (re-throwing would double-signal
// an error the user already handled). Otherwise the
// drain keeps the loud safety net and surfaces it.
if (closeFlushTimeoutMillis > 0L) {
drainOnClose();
drainOnClose(terminalOwnedByCustomHandler);
}
}
} catch (Throwable t) {
Expand Down Expand Up @@ -2558,7 +2571,7 @@ private void checkConnectionError() {
error.fillInStackTrace();
throw error;
}
// Poll the cursor I/O loop's lastError too. Without this, a fatal
// Poll the cursor I/O loop's terminalError too. Without this, a fatal
// wire / server-rejection error recorded by the I/O thread would
// only surface on the next flush() / close() — every row-level
// method (table, longColumn, atNow, etc.) routes through
Expand Down Expand Up @@ -2638,8 +2651,30 @@ private void dispatchConnectionEvent(
* SF-mode users can recover the unacked tail by reopening a sender on
* the same SF directory; memory-mode users have no recovery path and
* must treat this as fatal.
* <p>
* A latched terminal error means the server will never ACK up to
* {@code target}, so the drain must stop on it regardless. Whether it is
* also re-thrown from close() is a separate surfacing policy that mirrors
* the step-2 safety net in {@link #close()}:
* <ul>
* <li>{@code errorOwnedByCustomHandler == true}: a custom error handler
* has already delivered this terminal to the user, so stop silently —
* re-throwing here would double-signal it (the M3 drainOnClose
* double-signal).</li>
* <li>{@code errorOwnedByCustomHandler == false}: re-throw via
* {@code checkError()} to preserve the loud safety net (a
* config-string-only caller's only channel). The throw also breaks the
* loop; an error a synchronous {@code flush()}/{@code at()} caller
* already owns is then suppressed by close()'s
* {@code terminalError == alreadyOwnedByUser} check, so it is not
* double-signalled either.</li>
* </ul>
*
* @param errorOwnedByCustomHandler whether the async dispatcher has
* already delivered a terminal to a
* user-installed handler
*/
private void drainOnClose() {
private void drainOnClose(boolean errorOwnedByCustomHandler) {
if (closeFlushTimeoutMillis <= 0L) {
return;
}
Expand All @@ -2649,7 +2684,15 @@ private void drainOnClose() {
}
long deadlineNanos = System.nanoTime() + closeFlushTimeoutMillis * 1_000_000L;
while (cursorEngine.ackedFsn() < target) {
cursorSendLoop.checkError();
// Stop on a latched terminal (acks will never reach target);
// surface it only when no other channel already delivered it.
if (errorOwnedByCustomHandler) {
if (cursorSendLoop.getTerminalError() != null) {
return;
}
} else {
cursorSendLoop.checkError();
}
if (System.nanoTime() >= deadlineNanos) {
long acked = cursorEngine.ackedFsn();
LOG.warn("close() drain timed out after {}ms [target={} acked={}], pending data may be lost",
Expand Down
Loading
Loading