Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.

[Detail Bug] Producer: acks drained into claimable_tickets are not propagated on errorΒ #282

@detail-app

Description

@detail-app

Detail Bug Report

https://app.detail.dev/org_89d327b3-b883-4365-b6a3-46b6701342a9/bugs/bug_23a4f33d-e08f-48b6-a59c-38fb0eaf7149

Summary

  • Context: The Producer batches individual AppendRecords and uses an internal async run loop to manage pending acknowledgments, batch submissions, and error handling.
  • Bug: When an error occurs after record acknowledgments have been drained into claimable_tickets but before those tickets are processed, those acknowledgments are never sent to the waiting callers.
  • Actual vs. expected: The acknowledgments captured in claimable_tickets are silently dropped instead of being notified with the error, while acknowledgments still in pending_acks are correctly notified.
  • Impact: Record submitters will wait indefinitely for acknowledgments that will never arrive, causing hangs or timeouts in client applications.

Code with bug

claimable_tickets.push(ticket.map({
    let pending_acks = pending_acks.drain(..batch_len).collect::<Vec<_>>(); // <-- BUG πŸ”΄ Drains acks immediately into closure
    |batch_ack| (batch_ack, pending_acks)
}));
Err(err) => {
    propagate_terminal_error(
        err.into(),
        &terminal_err,
        &mut pending_acks,  // <-- BUG πŸ”΄ Does not handle acks already in claimable_tickets
        &mut stashed_submission,
        &mut close_tx,
        &mut cmd_rx,
    )
    .await;
    return;
}
async fn propagate_terminal_error(
    err: S2Error,
    terminal_err: &OnceLock<S2Error>,
    pending_acks: &mut VecDeque<PendingRecordAck>,
    stashed_submission: &mut Option<StashedSubmission>,
    close_tx: &mut Option<oneshot::Sender<Result<(), S2Error>>>,
    cmd_rx: &mut mpsc::Receiver<Command>,
) {
    let _ = terminal_err.set(err.clone());
    for pending in pending_acks.drain(..) {  // <-- BUG πŸ”΄ Acks already moved to claimable_tickets are never notified
        let _ = pending.ack_tx.send(Err(err.clone()));
    }
    // ... rest of function doesn't handle claimable_tickets
}

Logical proof

  1. When pushing a BatchSubmitTicket into claimable_tickets, pending_acks.drain(..batch_len) executes immediately, moving those acks into a closure captured by the mapped future.
  2. On error, propagate_terminal_error only iterates pending_acks, sending errors to acks that remain there. It has no access to acks already drained into claimable_tickets.
  3. The run loop returns after calling propagate_terminal_error. The futures in claimable_tickets are dropped without being polled, so their closures never run and the captured acks never have their senders invoked.
  4. Those oneshot senders are dropped without sending, so callers observe inconsistent behavior (e.g., a generic dropped error or terminal error via fallback) rather than the specific error being propagated as intended.

Recommended fix

Update error propagation to include claimable_tickets: accept it as a parameter and drain it, notifying all captured acks with the error before exiting.

async fn propagate_terminal_error(
    err: S2Error,
    terminal_err: &OnceLock<S2Error>,
    pending_acks: &mut VecDeque<PendingRecordAck>,
    claimable_tickets: &mut FuturesUnordered<_>,
    stashed_submission: &mut Option<StashedSubmission>,
    close_tx: &mut Option<oneshot::Sender<Result<(), S2Error>>>,
    cmd_rx: &mut mpsc::Receiver<Command>,
) {
    let _ = terminal_err.set(err.clone());

    for pending in pending_acks.drain(..) {
        let _ = pending.ack_tx.send(Err(err.clone()));
    }

    while let Some((_, pending_acks)) = claimable_tickets.next().await { // <-- FIX 🟒 Drain and notify acks captured in claimable_tickets
        for pending in pending_acks {
            let _ = pending.ack_tx.send(Err(err.clone()));
        }
    }

    // ... existing handling of stashed_submission, close_tx, and cmd_rx
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions