Skip to content

Commit bbbaa81

Browse files
committed
fix(tui): align rebased thread events and extract legacy bridge
Restore `HistoryEntryResponse` buffering on the rebased app-server thread pipeline so history lookups still replay and route correctly after moving onto `upstream/main`. Extract the remote legacy exec-approval bridge into a dedicated helper in `app_server_adapter.rs` so the special-case path is isolated from normal server request handling.
1 parent c7b24c6 commit bbbaa81

File tree

2 files changed

+127
-56
lines changed

2 files changed

+127
-56
lines changed

codex-rs/tui_app_server/src/app.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ impl ThreadEventStore {
618618
.pending_interactive_replay
619619
.should_replay_snapshot_request(request),
620620
ThreadBufferedEvent::Notification(_)
621+
| ThreadBufferedEvent::HistoryEntryResponse(_)
621622
| ThreadBufferedEvent::LegacyWarning(_)
622623
| ThreadBufferedEvent::LegacyRollback { .. } => true,
623624
})
@@ -2379,6 +2380,50 @@ impl App {
23792380
Ok(())
23802381
}
23812382

2383+
async fn enqueue_thread_history_entry_response(
2384+
&mut self,
2385+
thread_id: ThreadId,
2386+
event: GetHistoryEntryResponseEvent,
2387+
) -> Result<()> {
2388+
let (sender, store) = {
2389+
let channel = self.ensure_thread_channel(thread_id);
2390+
(channel.sender.clone(), Arc::clone(&channel.store))
2391+
};
2392+
2393+
let should_send = {
2394+
let mut guard = store.lock().await;
2395+
guard
2396+
.buffer
2397+
.push_back(ThreadBufferedEvent::HistoryEntryResponse(event.clone()));
2398+
if guard.buffer.len() > guard.capacity
2399+
&& let Some(removed) = guard.buffer.pop_front()
2400+
&& let ThreadBufferedEvent::Request(request) = &removed
2401+
{
2402+
guard
2403+
.pending_interactive_replay
2404+
.note_evicted_server_request(request);
2405+
}
2406+
guard.active
2407+
};
2408+
2409+
if should_send {
2410+
match sender.try_send(ThreadBufferedEvent::HistoryEntryResponse(event)) {
2411+
Ok(()) => {}
2412+
Err(TrySendError::Full(event)) => {
2413+
tokio::spawn(async move {
2414+
if let Err(err) = sender.send(event).await {
2415+
tracing::warn!("thread {thread_id} event channel closed: {err}");
2416+
}
2417+
});
2418+
}
2419+
Err(TrySendError::Closed(_)) => {
2420+
tracing::warn!("thread {thread_id} event channel closed");
2421+
}
2422+
}
2423+
}
2424+
Ok(())
2425+
}
2426+
23822427
async fn enqueue_thread_legacy_rollback(
23832428
&mut self,
23842429
thread_id: ThreadId,
@@ -2470,6 +2515,10 @@ impl App {
24702515
ThreadBufferedEvent::Request(request) => {
24712516
self.enqueue_thread_request(thread_id, request).await?;
24722517
}
2518+
ThreadBufferedEvent::HistoryEntryResponse(event) => {
2519+
self.enqueue_thread_history_entry_response(thread_id, event)
2520+
.await?;
2521+
}
24732522
ThreadBufferedEvent::LegacyWarning(message) => {
24742523
self.enqueue_thread_legacy_warning(thread_id, message)
24752524
.await?;
@@ -4795,6 +4844,9 @@ impl App {
47954844
self.chat_widget
47964845
.handle_server_request(request, /*replay_kind*/ None);
47974846
}
4847+
ThreadBufferedEvent::HistoryEntryResponse(event) => {
4848+
self.chat_widget.handle_history_entry_response(event);
4849+
}
47984850
ThreadBufferedEvent::LegacyWarning(message) => {
47994851
self.chat_widget.add_warning_message(message);
48004852
}
@@ -4816,6 +4868,9 @@ impl App {
48164868
ThreadBufferedEvent::Request(request) => self
48174869
.chat_widget
48184870
.handle_server_request(request, Some(ReplayKind::ThreadSnapshot)),
4871+
ThreadBufferedEvent::HistoryEntryResponse(event) => {
4872+
self.chat_widget.handle_history_entry_response(event)
4873+
}
48194874
ThreadBufferedEvent::LegacyWarning(message) => {
48204875
self.chat_widget.add_warning_message(message);
48214876
}

codex-rs/tui_app_server/src/app/app_server_adapter.rs

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -164,63 +164,10 @@ impl App {
164164
app_server_client: &AppServerSession,
165165
request: ServerRequest,
166166
) {
167-
if app_server_client.is_remote()
168-
&& let ServerRequest::ExecCommandApproval { .. } = &request
167+
if self
168+
.try_handle_legacy_remote_exec_approval_request(app_server_client, &request)
169+
.await
169170
{
170-
match bridge_legacy_exec_approval_request(&request) {
171-
Ok((thread_id, bridged_request)) => {
172-
if let Some(unsupported) = self
173-
.pending_app_server_requests
174-
.note_server_request(&request)
175-
{
176-
tracing::warn!(
177-
request_id = ?unsupported.request_id,
178-
message = unsupported.message,
179-
"rejecting unsupported app-server request"
180-
);
181-
self.chat_widget
182-
.add_error_message(unsupported.message.clone());
183-
if let Err(err) = self
184-
.reject_app_server_request(
185-
app_server_client,
186-
unsupported.request_id,
187-
unsupported.message,
188-
)
189-
.await
190-
{
191-
tracing::warn!("{err}");
192-
}
193-
return;
194-
}
195-
196-
let result = if self.primary_thread_id == Some(thread_id)
197-
|| self.primary_thread_id.is_none()
198-
{
199-
self.enqueue_primary_thread_request(bridged_request).await
200-
} else {
201-
self.enqueue_thread_request(thread_id, bridged_request)
202-
.await
203-
};
204-
if let Err(err) = result {
205-
self.reject_failed_legacy_exec_approval_enqueue(
206-
app_server_client,
207-
&request,
208-
err.to_string(),
209-
)
210-
.await;
211-
}
212-
}
213-
Err(message) => {
214-
tracing::warn!(request_id = ?request.id(), "{message}");
215-
self.chat_widget.add_error_message(message.clone());
216-
if let Err(err) = self
217-
.reject_app_server_request(app_server_client, request.id().clone(), message)
218-
.await
219-
{
220-
tracing::warn!("{err}");
221-
}
222-
}
223-
}
224171
return;
225172
}
226173

@@ -264,6 +211,75 @@ impl App {
264211
}
265212
}
266213

214+
async fn try_handle_legacy_remote_exec_approval_request(
215+
&mut self,
216+
app_server_client: &AppServerSession,
217+
request: &ServerRequest,
218+
) -> bool {
219+
if !app_server_client.is_remote()
220+
|| !matches!(request, ServerRequest::ExecCommandApproval { .. })
221+
{
222+
return false;
223+
}
224+
225+
match bridge_legacy_exec_approval_request(request) {
226+
Ok((thread_id, bridged_request)) => {
227+
if let Some(unsupported) = self
228+
.pending_app_server_requests
229+
.note_server_request(request)
230+
{
231+
tracing::warn!(
232+
request_id = ?unsupported.request_id,
233+
message = unsupported.message,
234+
"rejecting unsupported app-server request"
235+
);
236+
self.chat_widget
237+
.add_error_message(unsupported.message.clone());
238+
if let Err(err) = self
239+
.reject_app_server_request(
240+
app_server_client,
241+
unsupported.request_id,
242+
unsupported.message,
243+
)
244+
.await
245+
{
246+
tracing::warn!("{err}");
247+
}
248+
return true;
249+
}
250+
251+
let result = if self.primary_thread_id == Some(thread_id)
252+
|| self.primary_thread_id.is_none()
253+
{
254+
self.enqueue_primary_thread_request(bridged_request).await
255+
} else {
256+
self.enqueue_thread_request(thread_id, bridged_request)
257+
.await
258+
};
259+
if let Err(err) = result {
260+
self.reject_failed_legacy_exec_approval_enqueue(
261+
app_server_client,
262+
request,
263+
err.to_string(),
264+
)
265+
.await;
266+
}
267+
}
268+
Err(message) => {
269+
tracing::warn!(request_id = ?request.id(), "{message}");
270+
self.chat_widget.add_error_message(message.clone());
271+
if let Err(err) = self
272+
.reject_app_server_request(app_server_client, request.id().clone(), message)
273+
.await
274+
{
275+
tracing::warn!("{err}");
276+
}
277+
}
278+
}
279+
280+
true
281+
}
282+
267283
async fn reject_failed_legacy_exec_approval_enqueue(
268284
&mut self,
269285
app_server_client: &AppServerSession,

0 commit comments

Comments
 (0)