Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.

[Detail Bug] append_session misclassifies initial HTTP 412 PRECONDITION_FAILED as generic server errorΒ #281

@detail-app

Description

@detail-app

Detail Bug Report

https://app.detail.dev/org_89d327b3-b883-4365-b6a3-46b6701342a9/bugs/bug_6c33f00a-110b-4133-9b9c-3e837e728a82

Summary

  • Context: The append_session method establishes a streaming session to append records to a stream.
  • Bug: When the server returns an initial HTTP 412 PRECONDITION_FAILED error before streaming starts, append_session does not parse the response body as AppendConditionFailed.
  • Actual vs. expected: The error is parsed as a generic ApiErrorResponse and returned as ApiError::Server, but it should be parsed as AppendConditionFailed and returned as ApiError::AppendConditionFailed.
  • Impact: Users cannot distinguish between append condition failures (which contain specific failure reasons like fencing token mismatch or sequence number mismatch) and other server errors when using append_session.

Code with bug

pub async fn append_session<I>(
    &self,
    name: &StreamName,
    inputs: I,
) -> Result<Streaming<AppendAck>, ApiError>
where
    I: Stream<Item = AppendInput> + Send + 'static,
{
    let url = self
        .base_url
        .join(&format!("v1/streams/{}/records", urlencoding::encode(name)))?;

    let compression = self.config.compression.into();

    let encoded_stream = inputs.map(move |input| {
        s2s::SessionMessage::regular(compression, &input).map(|msg| msg.encode())
    });

    let mut request = self
        .post(url)
        .header(CONTENT_TYPE, CONTENT_TYPE_S2S)
        .body(reqwest::Body::wrap_stream(encoded_stream))
        .timeout(SESSION_REQUEST_TIMEOUT);
    request = add_basin_header_if_required(request, &self.config.endpoints, &self.name);
    let response = request.send().await?.into_result().await?;  // <-- BUG πŸ”΄ Uses default error handling (no 412 mapping)
    let mut bytes_stream = response.bytes_stream();

    let mut buffer = BytesMut::new();
    let mut decoder = FrameDecoder;

    Ok(Box::pin(try_stream! {
        while let Some(chunk) = bytes_stream.next().await {
            let chunk = chunk?;
            buffer.extend_from_slice(&chunk);

            loop {
                match decoder.decode(&mut buffer) {
                    Ok(Some(SessionMessage::Regular(msg))) => {
                        yield msg.try_into_proto()?;
                    }
                    Ok(Some(SessionMessage::Terminal(msg))) => {
                        Err::<(), ApiError>(msg.into())?;  // <-- Terminal messages are handled correctly
                    }
                    Ok(None) => break,
                    Err(err) => Err(err)?,
                }
            }
        }
    }))
}

Codebase inconsistency

  • In append, initial HTTP errors are handled with a custom handler that maps 412 to AppendConditionFailed. In append_session, the default into_result() is used, losing structured 412 details.

Reference (src/api.rs, BasinClient::append):

let response = self
    .request(request)
    .with_retry_enabled(retry_enabled)
    .error_handler(|status, response| async move {
        if status == StatusCode::PRECONDITION_FAILED {
            Err(ApiError::AppendConditionFailed(
                response.json::<AppendConditionFailed>().await?,
            ))
        } else {
            Err(ApiError::Server(
                status,
                response.json::<ApiErrorResponse>().await?,
            ))
        }
    })
    .send()
    .await?;

Current (src/api.rs, BasinClient::append_session):

let response = request.send().await?.into_result().await?;

This discrepancy means an initial HTTP 412 from the server (before streaming starts) is surfaced as a generic ApiError::Server rather than ApiError::AppendConditionFailed.

Recommended fix

Use a custom error handler in append_session mirroring append to map HTTP 412 to AppendConditionFailed:

let response = request
    .send()
    .await?
    .into_result_with_handler(|status, response| async move {
        if status == StatusCode::PRECONDITION_FAILED {  // <-- FIX 🟒 map 412 to AppendConditionFailed
            Err(ApiError::AppendConditionFailed(
                response.json::<AppendConditionFailed>().await?,
            ))
        } else {
            Err(ApiError::Server(
                status,
                response.json::<ApiErrorResponse>().await?,
            ))
        }
    })
    .await?;

This preserves existing terminal-message handling for failures occurring after the stream starts while restoring structured error details for initial HTTP 412 responses.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions