This repository was archived by the owner on Feb 16, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
[Detail Bug] Producer: acks drained into claimable_tickets are not propagated on errorΒ #282
Copy link
Copy link
Closed
Description
Detail Bug Report
Summary
- Context: The
Producerbatches individualAppendRecords and uses an internal async run loop to manage pending acknowledgments, batch submissions, and error handling. - Bug: When an error occurs after record acknowledgments have been drained into
claimable_ticketsbut before those tickets are processed, those acknowledgments are never sent to the waiting callers. - Actual vs. expected: The acknowledgments captured in
claimable_ticketsare silently dropped instead of being notified with the error, while acknowledgments still inpending_acksare correctly notified. - Impact: Record submitters will wait indefinitely for acknowledgments that will never arrive, causing hangs or timeouts in client applications.
Code with bug
claimable_tickets.push(ticket.map({
let pending_acks = pending_acks.drain(..batch_len).collect::<Vec<_>>(); // <-- BUG π΄ Drains acks immediately into closure
|batch_ack| (batch_ack, pending_acks)
}));Err(err) => {
propagate_terminal_error(
err.into(),
&terminal_err,
&mut pending_acks, // <-- BUG π΄ Does not handle acks already in claimable_tickets
&mut stashed_submission,
&mut close_tx,
&mut cmd_rx,
)
.await;
return;
}async fn propagate_terminal_error(
err: S2Error,
terminal_err: &OnceLock<S2Error>,
pending_acks: &mut VecDeque<PendingRecordAck>,
stashed_submission: &mut Option<StashedSubmission>,
close_tx: &mut Option<oneshot::Sender<Result<(), S2Error>>>,
cmd_rx: &mut mpsc::Receiver<Command>,
) {
let _ = terminal_err.set(err.clone());
for pending in pending_acks.drain(..) { // <-- BUG π΄ Acks already moved to claimable_tickets are never notified
let _ = pending.ack_tx.send(Err(err.clone()));
}
// ... rest of function doesn't handle claimable_tickets
}Logical proof
- When pushing a
BatchSubmitTicketintoclaimable_tickets,pending_acks.drain(..batch_len)executes immediately, moving those acks into a closure captured by the mapped future. - On error,
propagate_terminal_erroronly iteratespending_acks, sending errors to acks that remain there. It has no access to acks already drained intoclaimable_tickets. - The run loop returns after calling
propagate_terminal_error. The futures inclaimable_ticketsare dropped without being polled, so their closures never run and the captured acks never have their senders invoked. - Those oneshot senders are dropped without sending, so callers observe inconsistent behavior (e.g., a generic dropped error or terminal error via fallback) rather than the specific error being propagated as intended.
Recommended fix
Update error propagation to include claimable_tickets: accept it as a parameter and drain it, notifying all captured acks with the error before exiting.
async fn propagate_terminal_error(
err: S2Error,
terminal_err: &OnceLock<S2Error>,
pending_acks: &mut VecDeque<PendingRecordAck>,
claimable_tickets: &mut FuturesUnordered<_>,
stashed_submission: &mut Option<StashedSubmission>,
close_tx: &mut Option<oneshot::Sender<Result<(), S2Error>>>,
cmd_rx: &mut mpsc::Receiver<Command>,
) {
let _ = terminal_err.set(err.clone());
for pending in pending_acks.drain(..) {
let _ = pending.ack_tx.send(Err(err.clone()));
}
while let Some((_, pending_acks)) = claimable_tickets.next().await { // <-- FIX π’ Drain and notify acks captured in claimable_tickets
for pending in pending_acks {
let _ = pending.ack_tx.send(Err(err.clone()));
}
}
// ... existing handling of stashed_submission, close_tx, and cmd_rx
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels