This repository was archived by the owner on Feb 16, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
[Detail Bug] ReadStart mutated to invalid state during read_session retry (multiple position fields set)Β #283
Copy link
Copy link
Closed
Description
Detail Bug Report
Summary
- Context: The
read_sessionfunction insrc/session/read.rsmanages streaming reads from S2 streams, tracking the current read position to enable resumption after errors. - Bug: When updating the read start position after consuming a batch, the code only sets
start.seq_numwithout clearing the originaltimestamportail_offsetfields, resulting in multiple position fields being set simultaneously. - Actual vs. expected: The mutated
ReadStartstruct violates the API contract that "only one of seq_num, timestamp, or tail_offset can be provided", whereas it should have onlyseq_numset after consuming batches. - Impact: When a read session is started with
ReadFrom::TimestamporReadFrom::TailOffsetand an error occurs after reading one or more batches, the retry will fail with a validation error instead of successfully resuming from the correct position.
Code with bug
if let Some(record) = batch.records.last() {
start.seq_num = Some(record.seq_num + 1); // <-- BUG π΄ only sets seq_num without clearing timestamp/tail_offset
}Codebase inconsistency
The API enforces that exactly one of seq_num, timestamp, or tail_offset is set. Any combination with multiple fields set returns a validation error:
impl TryFrom<ReadStart> for types::stream::ReadStart {
type Error = types::ValidationError;
fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
let from = match (value.seq_num, value.timestamp, value.tail_offset) {
(Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
(None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
(None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
(None, None, None) => types::stream::ReadFrom::TailOffset(0),
_ => {
return Err(types::ValidationError(
"only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
));
}
};
let clamp = value.clamp.unwrap_or(false);
Ok(Self { from, clamp })
}
}Because the read logic mutates start.seq_num without clearing the other fields, a retry can send an invalid ReadStart. Example state after one batch when starting from a timestamp: ReadStart { seq_num: Some(100), timestamp: Some(1000), tail_offset: None, clamp: None } β this conversion returns ValidationError and prevents resumption.
Recommended fix
Rebuild ReadStart when advancing to the next position so only seq_num is set and other position fields are cleared (preserve clamp):
if let Some(record) = batch.records.last() {
start = ReadStart { // <-- FIX π’ reconstruct to clear other positions
seq_num: Some(record.seq_num + 1),
timestamp: None,
tail_offset: None,
clamp: start.clamp,
};
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels