This repository was archived by the owner on Feb 16, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
[Detail Bug] append_session misclassifies initial HTTP 412 PRECONDITION_FAILED as generic server errorΒ #281
Copy link
Copy link
Closed
Description
Detail Bug Report
Summary
- Context: The
append_sessionmethod establishes a streaming session to append records to a stream. - Bug: When the server returns an initial HTTP 412 PRECONDITION_FAILED error before streaming starts,
append_sessiondoes not parse the response body asAppendConditionFailed. - Actual vs. expected: The error is parsed as a generic
ApiErrorResponseand returned asApiError::Server, but it should be parsed asAppendConditionFailedand returned asApiError::AppendConditionFailed. - Impact: Users cannot distinguish between append condition failures (which contain specific failure reasons like fencing token mismatch or sequence number mismatch) and other server errors when using
append_session.
Code with bug
pub async fn append_session<I>(
&self,
name: &StreamName,
inputs: I,
) -> Result<Streaming<AppendAck>, ApiError>
where
I: Stream<Item = AppendInput> + Send + 'static,
{
let url = self
.base_url
.join(&format!("v1/streams/{}/records", urlencoding::encode(name)))?;
let compression = self.config.compression.into();
let encoded_stream = inputs.map(move |input| {
s2s::SessionMessage::regular(compression, &input).map(|msg| msg.encode())
});
let mut request = self
.post(url)
.header(CONTENT_TYPE, CONTENT_TYPE_S2S)
.body(reqwest::Body::wrap_stream(encoded_stream))
.timeout(SESSION_REQUEST_TIMEOUT);
request = add_basin_header_if_required(request, &self.config.endpoints, &self.name);
let response = request.send().await?.into_result().await?; // <-- BUG π΄ Uses default error handling (no 412 mapping)
let mut bytes_stream = response.bytes_stream();
let mut buffer = BytesMut::new();
let mut decoder = FrameDecoder;
Ok(Box::pin(try_stream! {
while let Some(chunk) = bytes_stream.next().await {
let chunk = chunk?;
buffer.extend_from_slice(&chunk);
loop {
match decoder.decode(&mut buffer) {
Ok(Some(SessionMessage::Regular(msg))) => {
yield msg.try_into_proto()?;
}
Ok(Some(SessionMessage::Terminal(msg))) => {
Err::<(), ApiError>(msg.into())?; // <-- Terminal messages are handled correctly
}
Ok(None) => break,
Err(err) => Err(err)?,
}
}
}
}))
}Codebase inconsistency
- In
append, initial HTTP errors are handled with a custom handler that maps 412 toAppendConditionFailed. Inappend_session, the defaultinto_result()is used, losing structured 412 details.
Reference (src/api.rs, BasinClient::append):
let response = self
.request(request)
.with_retry_enabled(retry_enabled)
.error_handler(|status, response| async move {
if status == StatusCode::PRECONDITION_FAILED {
Err(ApiError::AppendConditionFailed(
response.json::<AppendConditionFailed>().await?,
))
} else {
Err(ApiError::Server(
status,
response.json::<ApiErrorResponse>().await?,
))
}
})
.send()
.await?;Current (src/api.rs, BasinClient::append_session):
let response = request.send().await?.into_result().await?;This discrepancy means an initial HTTP 412 from the server (before streaming starts) is surfaced as a generic ApiError::Server rather than ApiError::AppendConditionFailed.
Recommended fix
Use a custom error handler in append_session mirroring append to map HTTP 412 to AppendConditionFailed:
let response = request
.send()
.await?
.into_result_with_handler(|status, response| async move {
if status == StatusCode::PRECONDITION_FAILED { // <-- FIX π’ map 412 to AppendConditionFailed
Err(ApiError::AppendConditionFailed(
response.json::<AppendConditionFailed>().await?,
))
} else {
Err(ApiError::Server(
status,
response.json::<ApiErrorResponse>().await?,
))
}
})
.await?;This preserves existing terminal-message handling for failures occurring after the stream starts while restoring structured error details for initial HTTP 412 responses.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels