Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions codex-rs/codex-api/src/endpoint/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
&self,
request: ResponsesApiRequest,
options: ResponsesOptions,
) -> Result<ResponseStream, ApiError> {
let mut body = serde_json::to_value(&request)
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
if request.store && self.session.provider().is_azure_responses_endpoint() {
attach_item_ids(&mut body, &request.input);
}
self.stream_request_with_body(body, options).await
}

pub async fn stream_request_with_body(
&self,
body: Value,
options: ResponsesOptions,
) -> Result<ResponseStream, ApiError> {
let ResponsesOptions {
conversation_id,
Expand All @@ -79,12 +92,6 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
turn_state,
} = options;

let mut body = serde_json::to_value(&request)
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
if request.store && self.session.provider().is_azure_responses_endpoint() {
attach_item_ids(&mut body, &request.input);
}

let mut headers = extra_headers;
if let Some(ref conv_id) = conversation_id {
insert_header(&mut headers, "x-client-request-id", conv_id);
Expand Down
15 changes: 12 additions & 3 deletions codex-rs/codex-api/src/endpoint/responses_websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ impl ResponsesWebsocketConnection {
&self,
request: ResponsesWsRequest,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
let request_body = serde_json::to_value(&request).map_err(|err| {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;
self.stream_request_with_body(request_body, connection_reused)
.await
}

pub async fn stream_request_with_body(
&self,
request_body: Value,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
let (tx_event, rx_event) =
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
Expand All @@ -224,9 +236,6 @@ impl ResponsesWebsocketConnection {
let models_etag = self.models_etag.clone();
let server_model = self.server_model.clone();
let telemetry = self.telemetry.clone();
let request_body = serde_json::to_value(&request).map_err(|err| {
ApiError::Stream(format!("failed to encode websocket request: {err}"))
})?;

let current_span = Span::current();
tokio::spawn(
Expand Down
8 changes: 7 additions & 1 deletion codex-rs/core/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@
"realtime_conversation": {
"type": "boolean"
},
"record_response_item_id": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},
Expand Down Expand Up @@ -1995,6 +1998,9 @@
"realtime_conversation": {
"type": "boolean"
},
"record_response_item_id": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},
Expand Down Expand Up @@ -2479,4 +2485,4 @@
},
"title": "ConfigToml",
"type": "object"
}
}
64 changes: 58 additions & 6 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::extract_response_debug_context_from_api_error;
use crate::response_debug_context::telemetry_api_error_message;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::response_item_id_serde::ResponseItemIdSerialization;
use crate::response_item_id_serde::serialize_responses_request_body;
use crate::response_item_id_serde::serialize_responses_ws_request_body;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_auth_recovery_tags;
Expand Down Expand Up @@ -137,6 +140,7 @@ struct ModelClientState {
enable_request_compression: bool,
include_timing_metrics: bool,
beta_features_header: Option<String>,
response_item_ids: ResponseItemIdSerialization,
disable_websockets: AtomicBool,
cached_websocket_session: StdMutex<WebsocketSession>,
}
Expand Down Expand Up @@ -178,6 +182,22 @@ pub struct ModelClient {
state: Arc<ModelClientState>,
}

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ModelClientResponseItemIds {
#[default]
Disabled,
Enabled,
}

impl From<ModelClientResponseItemIds> for ResponseItemIdSerialization {
fn from(value: ModelClientResponseItemIds) -> Self {
match value {
ModelClientResponseItemIds::Disabled => ResponseItemIdSerialization::Disabled,
ModelClientResponseItemIds::Enabled => ResponseItemIdSerialization::Enabled,
}
}
}

/// A turn-scoped streaming session created from a [`ModelClient`].
///
/// The session establishes a Responses WebSocket connection lazily and reuses it across multiple
Expand Down Expand Up @@ -257,6 +277,7 @@ impl ModelClient {
enable_request_compression: bool,
include_timing_metrics: bool,
beta_features_header: Option<String>,
response_item_ids: ModelClientResponseItemIds,
) -> Self {
let codex_api_key_env_enabled = auth_manager
.as_ref()
Expand All @@ -273,6 +294,7 @@ impl ModelClient {
enable_request_compression,
include_timing_metrics,
beta_features_header,
response_item_ids: response_item_ids.into(),
disable_websockets: AtomicBool::new(false),
cached_websocket_session: StdMutex::new(WebsocketSession::default()),
}),
Expand Down Expand Up @@ -1049,7 +1071,18 @@ impl ModelClientSession {
client_setup.api_auth,
)
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
let stream_result = client.stream_request(request, options).await;
let stream_result = if self.client.state.response_item_ids.is_enabled() {
let request_body =
serialize_responses_request_body(&request, self.client.state.response_item_ids)
.map_err(|err| {
map_api_error(ApiError::Stream(format!(
"failed to encode responses request: {err}"
)))
})?;
client.stream_request_with_body(request_body, options).await
} else {
client.stream_request(request, options).await
};

match stream_result {
Ok(stream) => {
Expand Down Expand Up @@ -1170,15 +1203,34 @@ impl ModelClientSession {

let ws_request = self.prepare_websocket_request(ws_payload, &request);
self.websocket_session.last_request = Some(request);
let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| {
let connection = self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = stream_result
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
let stream_result = if self.client.state.response_item_ids.is_enabled() {
let request_body = serialize_responses_ws_request_body(
&ws_request,
self.client.state.response_item_ids,
)
.map_err(|err| {
map_api_error(ApiError::Stream(format!(
"failed to encode websocket request: {err}"
)))
})?;
connection
.stream_request_with_body(
request_body,
self.websocket_session.connection_reused(),
)
.await
.map_err(map_api_error)?
} else {
connection
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?
};
let (stream, last_request_rx) =
map_response_stream(stream_result, session_telemetry.clone());
self.websocket_session.last_response_rx = Some(last_request_rx);
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/client_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::ModelClientResponseItemIds;
use super::PendingUnauthorizedRetry;
use super::UnauthorizedRecoveryExecution;
use codex_otel::SessionTelemetry;
Expand All @@ -24,6 +25,7 @@ fn test_model_client(session_source: SessionSource) -> ModelClient {
false,
false,
None,
ModelClientResponseItemIds::Disabled,
)
}

Expand Down
6 changes: 6 additions & 0 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ use uuid::Uuid;

use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client::ModelClientResponseItemIds;
use crate::client::ModelClientSession;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
Expand Down Expand Up @@ -1815,6 +1816,11 @@ impl Session {
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Self::build_model_client_beta_features_header(config.as_ref()),
if config.features.enabled(Feature::RecordResponseItemId) {
ModelClientResponseItemIds::Enabled
} else {
ModelClientResponseItemIds::Disabled
},
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/core/src/codex_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ fn test_model_client_session() -> crate::client::ModelClientSession {
false,
false,
None,
crate::client::ModelClientResponseItemIds::Disabled,
)
.new_session()
}
Expand Down Expand Up @@ -2515,6 +2516,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
crate::client::ModelClientResponseItemIds::Disabled,
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
Expand Down Expand Up @@ -3309,6 +3311,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
crate::client::ModelClientResponseItemIds::Disabled,
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
Expand Down
8 changes: 8 additions & 0 deletions codex-rs/core/src/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub enum Feature {
RealtimeConversation,
/// Route interactive startup to the app-server-backed TUI implementation.
TuiAppServer,
/// Persist and resend provider ResponseItem ids in internal rollout/client payloads.
RecordResponseItemId,
/// Prevent idle system sleep while a turn is actively running.
PreventIdleSleep,
/// Legacy rollout flag for Responses API WebSocket transport experiments.
Expand Down Expand Up @@ -839,6 +841,12 @@ pub const FEATURES: &[FeatureSpec] = &[
},
default_enabled: false,
},
FeatureSpec {
id: Feature::RecordResponseItemId,
key: "record_response_item_id",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::PreventIdleSleep,
key: "prevent_idle_sleep",
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub mod token_data;
mod truncate;
mod unified_exec;
pub mod windows_sandbox;
pub use client::ModelClientResponseItemIds;
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
pub use model_provider_info::DEFAULT_OLLAMA_PORT;
Expand All @@ -92,6 +93,7 @@ pub use model_provider_info::built_in_model_providers;
pub use model_provider_info::create_oss_provider_with_base_url;
mod event_mapping;
mod response_debug_context;
mod response_item_id_serde;
pub mod review_format;
pub mod review_prompts;
mod seatbelt_permissions;
Expand Down
Loading
Loading