diff --git a/src-tauri/src/acp/lifecycle.rs b/src-tauri/src/acp/lifecycle.rs index a893c190..fae0860b 100644 --- a/src-tauri/src/acp/lifecycle.rs +++ b/src-tauri/src/acp/lifecycle.rs @@ -97,8 +97,12 @@ pub(crate) async fn handle_event( }; let conversation_id = state_arc.read().await.conversation_id; if let Some(cid) = conversation_id { - conversation_service::update_external_id(db_conn, cid, session_id.clone()) - .await?; + conversation_service::update_external_id_if_missing( + db_conn, + cid, + session_id.clone(), + ) + .await?; } Ok(()) } @@ -366,6 +370,40 @@ mod tests { assert_eq!(reloaded.external_id.as_deref(), Some("ext-99")); } + #[tokio::test] + async fn handle_event_does_not_overwrite_existing_external_id() { + let db = test_helpers::fresh_in_memory_db().await; + let folder_id = test_helpers::seed_folder(&db, "/tmp/test-existing-ext").await; + let conv = + conversation_service::create(&db.conn, folder_id, AgentType::ClaudeCode, None, None) + .await + .unwrap(); + conversation_service::update_external_id(&db.conn, conv.id, "ext-existing".into()) + .await + .unwrap(); + + let mgr = ConnectionManager::new(); + { + let mut map = mgr.connections.lock().await; + map.insert( + "c1".to_string(), + fake_connection_with_state("c1", Some(conv.id)), + ); + } + let env = EventEnvelope { + seq: 1, + connection_id: "c1".to_string(), + payload: AcpEvent::SessionStarted { + session_id: "ext-new".into(), + }, + }; + handle_event(&db.conn, &mgr, &env).await.unwrap(); + let reloaded = conversation_service::get_by_id(&db.conn, conv.id) + .await + .unwrap(); + assert_eq!(reloaded.external_id.as_deref(), Some("ext-existing")); + } + #[tokio::test] async fn handle_event_is_noop_when_no_conversation_bound() { let db = test_helpers::fresh_in_memory_db().await; diff --git a/src-tauri/src/db/service/conversation_service.rs b/src-tauri/src/db/service/conversation_service.rs index 7056905c..3c70792c 100644 --- a/src-tauri/src/db/service/conversation_service.rs +++ b/src-tauri/src/db/service/conversation_service.rs @@ -108,6 +108,23 @@ pub async fn update_external_id( Ok(()) } +pub async fn update_external_id_if_missing( + conn: &DatabaseConnection, + conversation_id: i32, + external_id: String, +) -> Result { + use sea_orm::sea_query::Expr; + let result = conversation::Entity::update_many() + .col_expr(conversation::Column::ExternalId, Expr::value(external_id)) + .col_expr(conversation::Column::UpdatedAt, Expr::value(Utc::now())) + .filter(conversation::Column::Id.eq(conversation_id)) + .filter(conversation::Column::DeletedAt.is_null()) + .filter(conversation::Column::ExternalId.is_null()) + .exec(conn) + .await?; + Ok(result.rows_affected > 0) +} + pub async fn soft_delete(conn: &DatabaseConnection, conversation_id: i32) -> Result<(), DbError> { let conv = conversation::Entity::find_by_id(conversation_id) .filter(conversation::Column::DeletedAt.is_null()) diff --git a/src-tauri/src/parsers/gemini.rs b/src-tauri/src/parsers/gemini.rs index 9f8b004d..8ae1726b 100644 --- a/src-tauri/src/parsers/gemini.rs +++ b/src-tauri/src/parsers/gemini.rs @@ -721,8 +721,11 @@ fn group_into_turns(messages: Vec) -> Vec { mod tests { use super::resolve_gemini_base_dir_from; use super::GeminiParser; - use crate::models::ContentBlock; - use crate::parsers::AgentParser; + use chrono::{DateTime, Utc}; + use crate::models::{ContentBlock, MessageRole, UnifiedMessage}; + use crate::parsers::{ + stable_user_anchor_id_from_message, stable_user_anchor_id_from_parts, AgentParser, + }; use std::env; use std::fs; use std::path::PathBuf; diff --git a/src/components/conversations/conversation-detail-panel.tsx b/src/components/conversations/conversation-detail-panel.tsx index 34462fe5..aad10b43 100644 --- a/src/components/conversations/conversation-detail-panel.tsx +++ b/src/components/conversations/conversation-detail-panel.tsx @@ -176,6 +176,7 @@ const ConversationTabView = memo(function ConversationTabView({ setExternalId, setLiveMessage, setPendingCleanup, + setRecoveryConversationId, setSyncState, } = useConversationRuntime() @@ -275,6 +276,10 @@ const ConversationTabView = memo(function ConversationTabView({ } return buildNewConversationDraftStorageKey() }, [dbConversationId]) + + useEffect(() => { + setRecoveryConversationId(effectiveConversationId, dbConversationId) + }, [dbConversationId, effectiveConversationId, setRecoveryConversationId]) // Use the per-tab workingDir (derived from the tab's own folderId by the // parent) rather than the active folder's path — otherwise switching tabs // briefly exposes the previous folder's path to the ACP auto-connect @@ -361,7 +366,8 @@ const ConversationTabView = memo(function ConversationTabView({ prevConnStatusRef.current = connStatus if (!wasPrompting || connStatus === "prompting") return - // Turn completed — promote liveMessage + optimisticTurns to localTurns + // Turn ended (success, cancel, or error) — promote liveMessage + + // optimisticTurns so reload/refetch does not drop the user turn. completeTurn(effectiveConversationId) // Cancel previous metadata sync (handles rapid consecutive turns) diff --git a/src/contexts/conversation-runtime-context.tsx b/src/contexts/conversation-runtime-context.tsx index 9c426272..969c4e09 100644 --- a/src/contexts/conversation-runtime-context.tsx +++ b/src/contexts/conversation-runtime-context.tsx @@ -4,6 +4,7 @@ import { createContext, useCallback, useContext, + useEffect, useMemo, useReducer, useRef, @@ -14,6 +15,11 @@ import type { ToolCallInfo, } from "@/contexts/acp-connections-context" import { getFolderConversation } from "@/lib/api" +import { + clearRecoverableOptimisticTurns, + loadRecoverableOptimisticTurns, + saveRecoverableOptimisticTurns, +} from "@/lib/conversation-recovery-storage" import type { AgentExecutionStats, DbConversationDetail, @@ -39,6 +45,7 @@ export interface ConversationTimelineTurn { export interface ConversationRuntimeSession { conversationId: number externalId: string | null + recoveryConversationId: number | null // DB data (cold open only) detail: DbConversationDetail | null @@ -120,6 +127,11 @@ type Action = conversationId: number externalId: string | null } + | { + type: "SET_RECOVERY_CONVERSATION_ID" + conversationId: number + recoveryConversationId: number | null + } | { type: "SET_SYNC_STATE" conversationId: number @@ -150,15 +162,24 @@ type Action = | { type: "RESET" } function createEmptySession( - conversationId: number + conversationId: number, + recoveryConversationId: number | null = conversationId > 0 + ? conversationId + : null ): ConversationRuntimeSession { + const recoveredLocalTurns = + recoveryConversationId != null + ? loadRecoverableOptimisticTurns(recoveryConversationId) + : [] + return { conversationId, externalId: null, + recoveryConversationId, detail: null, detailLoading: false, detailError: null, - localTurns: [], + localTurns: recoveredLocalTurns, optimisticTurns: [], liveMessage: null, syncState: "idle", @@ -168,6 +189,109 @@ function createEmptySession( } } +function getTurnBlocksSignature(turn: MessageTurn): string { + try { + return JSON.stringify(turn.blocks) + } catch { + return `blocks:${turn.blocks.length}` + } +} + +function getTimestampMs(timestamp: string): number | null { + const value = Date.parse(timestamp) + return Number.isFinite(value) ? value : null +} + +function looksLikeOptimisticAnchorId( + anchorId: string | null | undefined +): boolean { + return typeof anchorId === "string" && anchorId.startsWith("optimistic:") +} + +function countUserTurns(turns: MessageTurn[]): number { + return turns.filter((turn) => turn.role === "user").length +} + +function reconcileOptimisticTurns( + optimisticTurns: MessageTurn[], + knownCompletedTurns: MessageTurn[], + persistedTurns: MessageTurn[] +): MessageTurn[] { + if (optimisticTurns.length === 0 || persistedTurns.length === 0) { + return optimisticTurns + } + + const persistedUsers = persistedTurns + .filter((turn) => { + return ( + turn.role === "user" && !looksLikeOptimisticAnchorId(turn.anchor_id) + ) + }) + .map((turn) => ({ + signature: getTurnBlocksSignature(turn), + timestampMs: getTimestampMs(turn.timestamp), + })) + + const baselineKnownUserCount = countUserTurns(knownCompletedTurns) + const optimisticUserCount = countUserTurns(optimisticTurns) + const appendedPersistedUsers = + baselineKnownUserCount === 0 && optimisticUserCount > 0 + ? persistedUsers.slice(-optimisticUserCount) + : persistedUsers.slice(baselineKnownUserCount) + + if (appendedPersistedUsers.length === 0) return optimisticTurns + + let nextCandidateIndex = 0 + + return optimisticTurns.filter((turn) => { + if (turn.role !== "user") return true + + const candidate = appendedPersistedUsers[nextCandidateIndex] + if (!candidate) return true + + const optimisticSignature = getTurnBlocksSignature(turn) + if (candidate.signature !== optimisticSignature) { + return true + } + + const optimisticTimestampMs = getTimestampMs(turn.timestamp) + const distance = + candidate.timestampMs !== null && optimisticTimestampMs !== null + ? Math.abs(candidate.timestampMs - optimisticTimestampMs) + : 0 + + if (distance > 5 * 60 * 1000) { + return true + } + + nextCandidateIndex += 1 + return false + }) +} + +function isRecoverableUserTurn(turn: MessageTurn): boolean { + return turn.role === "user" && looksLikeOptimisticAnchorId(turn.anchor_id) +} + +function dedupeRecoverableTurns(turns: MessageTurn[]): MessageTurn[] { + const seen = new Set() + return turns.filter((turn) => { + const key = `${turn.id}:${turn.timestamp}` + if (seen.has(key)) return false + seen.add(key) + return true + }) +} + +function collectRecoverableUserTurns( + session: ConversationRuntimeSession +): MessageTurn[] { + return dedupeRecoverableTurns([ + ...session.optimisticTurns.filter(isRecoverableUserTurn), + ...session.localTurns.filter(isRecoverableUserTurn), + ]) +} + function formatLivePlanEntries( entries: Array<{ content: string; priority: string; status: string }> ): string { @@ -543,18 +667,36 @@ function reducer( createEmptySession(action.conversationId) const nextExternalId = action.detail.summary.external_id ?? null - // DB data is authoritative for completed turns — always clear localTurns. - // Only preserve optimisticTurns + liveMessage if user actively sent - // a message and is awaiting agent response. + // DB data is authoritative for persisted turns, but keep localTurns while + // they still represent failed/unpersisted user messages that must survive + // reload until the parser catches up. const isActivelyInteracting = current.syncState === "awaiting_persist" + const recoverableLocalTurns = current.localTurns.filter( + isRecoverableUserTurn + ) + const reconciledLocalTurns = reconcileOptimisticTurns( + current.localTurns, + current.detail?.turns ?? [], + action.detail.turns ?? [] + ) + const reconciledRecoverableLocalTurns = reconcileOptimisticTurns( + recoverableLocalTurns, + current.detail?.turns ?? [], + action.detail.turns ?? [] + ) + + const nextLocalTurns = isActivelyInteracting + ? reconciledLocalTurns + : reconciledRecoverableLocalTurns + const nextSession: ConversationRuntimeSession = { ...current, detail: action.detail, detailLoading: false, detailError: null, externalId: nextExternalId ?? current.externalId, - localTurns: [], + localTurns: nextLocalTurns, sessionStats: action.detail.session_stats ?? current.sessionStats, ...(isActivelyInteracting ? {} @@ -673,6 +815,24 @@ function reducer( } } + case "SET_RECOVERY_CONVERSATION_ID": + return updateSessionInState(state, action.conversationId, (current) => { + if (current.recoveryConversationId === action.recoveryConversationId) { + return current + } + + const nextLocalTurns = + current.localTurns.length > 0 || action.recoveryConversationId == null + ? current.localTurns + : loadRecoverableOptimisticTurns(action.recoveryConversationId) + + return { + ...current, + recoveryConversationId: action.recoveryConversationId, + localTurns: dedupeRecoverableTurns(nextLocalTurns), + } + }) + case "SET_SYNC_STATE": return updateSessionInState(state, action.conversationId, (current) => ({ ...current, @@ -693,6 +853,10 @@ function reducer( ...to, ...from, conversationId: action.toConversationId, + recoveryConversationId: + to.recoveryConversationId ?? + from.recoveryConversationId ?? + (action.toConversationId > 0 ? action.toConversationId : null), detail: to.detail ?? from.detail, detailLoading: to.detailLoading || from.detailLoading, detailError: to.detailError ?? from.detailError, @@ -809,6 +973,10 @@ interface ConversationRuntimeContextValue { isLive?: boolean ) => void setExternalId: (conversationId: number, externalId: string | null) => void + setRecoveryConversationId: ( + conversationId: number, + recoveryConversationId: number | null + ) => void setSyncState: ( conversationId: number, syncState: ConversationSyncState @@ -836,6 +1004,23 @@ export function ConversationRuntimeProvider({ }) { const [state, dispatch] = useReducer(reducer, initialState) + useEffect(() => { + for (const session of state.byConversationId.values()) { + const recoveryConversationId = session.recoveryConversationId + if (recoveryConversationId == null) { + continue + } + + const recoverableTurns = collectRecoverableUserTurns(session) + if (recoverableTurns.length > 0) { + saveRecoverableOptimisticTurns(recoveryConversationId, recoverableTurns) + continue + } + + clearRecoverableOptimisticTurns(recoveryConversationId) + } + }, [state.byConversationId]) + const stateRef = useRef(state) // eslint-disable-next-line react-hooks/refs -- stateRef is only read in callbacks, not during render stateRef.current = state @@ -911,12 +1096,12 @@ export function ConversationRuntimeProvider({ const session = stateRef.current.byConversationId.get(conversationId) if (session?.detail || session?.detailLoading) return - // Skip fetch if session has active data (ongoing conversation) + // Allow cold detail loads even when the runtime restored recoverable + // failed-user turns from localStorage. Only skip when there is truly + // in-flight data that would make the fetch immediately stale. if ( session && - (session.optimisticTurns.length > 0 || - session.liveMessage !== null || - session.localTurns.length > 0) + (session.optimisticTurns.length > 0 || session.liveMessage) ) { return } @@ -1077,6 +1262,17 @@ export function ConversationRuntimeProvider({ [] ) + const setRecoveryConversationId = useCallback( + (conversationId: number, recoveryConversationId: number | null) => { + dispatch({ + type: "SET_RECOVERY_CONVERSATION_ID", + conversationId, + recoveryConversationId, + }) + }, + [] + ) + const setSyncState = useCallback( (conversationId: number, syncState: ConversationSyncState) => { dispatch({ type: "SET_SYNC_STATE", conversationId, syncState }) @@ -1103,10 +1299,21 @@ export function ConversationRuntimeProvider({ ) const removeConversation = useCallback((conversationId: number) => { + const recoveryConversationId = + stateRef.current.byConversationId.get(conversationId) + ?.recoveryConversationId ?? null + if (recoveryConversationId != null) { + clearRecoverableOptimisticTurns(recoveryConversationId) + } dispatch({ type: "REMOVE_CONVERSATION", conversationId }) }, []) const reset = useCallback(() => { + for (const session of stateRef.current.byConversationId.values()) { + if (session.recoveryConversationId != null) { + clearRecoverableOptimisticTurns(session.recoveryConversationId) + } + } dispatch({ type: "RESET" }) }, []) @@ -1122,6 +1329,7 @@ export function ConversationRuntimeProvider({ appendOptimisticTurn, setLiveMessage, setExternalId, + setRecoveryConversationId, setSyncState, migrateConversation, setPendingCleanup, @@ -1139,6 +1347,7 @@ export function ConversationRuntimeProvider({ appendOptimisticTurn, setLiveMessage, setExternalId, + setRecoveryConversationId, setSyncState, migrateConversation, setPendingCleanup, diff --git a/src/lib/conversation-recovery-storage.ts b/src/lib/conversation-recovery-storage.ts new file mode 100644 index 00000000..95d96c94 --- /dev/null +++ b/src/lib/conversation-recovery-storage.ts @@ -0,0 +1,115 @@ +import type { ContentBlock, MessageTurn } from "@/lib/types" + +const STORAGE_KEY_PREFIX = "codeg:conversation-recovery-optimistic-turns" + +function buildStorageKey(conversationId: number): string { + return `${STORAGE_KEY_PREFIX}:${conversationId}` +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null +} + +function isOptimisticAnchorId(anchorId: unknown): anchorId is string { + return typeof anchorId === "string" && anchorId.startsWith("optimistic:") +} + +function isRecoverableContentBlock(block: unknown): block is ContentBlock { + if (!isRecord(block) || typeof block.type !== "string") { + return false + } + + switch (block.type) { + case "text": + return typeof block.text === "string" + case "image": + return ( + typeof block.data === "string" && typeof block.mime_type === "string" + ) + default: + return false + } +} + +function normalizeRecoverableTurn(turn: unknown): MessageTurn | null { + if (!isRecord(turn)) return null + if (turn.role !== "user") return null + if (typeof turn.id !== "string" || typeof turn.timestamp !== "string") { + return null + } + if (!Array.isArray(turn.blocks)) return null + + const blocks = turn.blocks.filter(isRecoverableContentBlock) + if (blocks.length !== turn.blocks.length) return null + + if (!isOptimisticAnchorId(turn.anchor_id)) { + return null + } + + return { + id: turn.id, + anchor_id: turn.anchor_id, + role: "user", + blocks, + timestamp: turn.timestamp, + } +} + +export function loadRecoverableOptimisticTurns( + conversationId: number +): MessageTurn[] { + if (!Number.isFinite(conversationId) || conversationId <= 0) { + return [] + } + if (typeof window === "undefined") return [] + + try { + const raw = localStorage.getItem(buildStorageKey(conversationId)) + if (!raw) return [] + const parsed: unknown = JSON.parse(raw) + if (!Array.isArray(parsed)) return [] + return parsed + .map((turn) => normalizeRecoverableTurn(turn)) + .filter((turn): turn is MessageTurn => turn !== null) + } catch { + return [] + } +} + +export function saveRecoverableOptimisticTurns( + conversationId: number, + turns: MessageTurn[] +): void { + if (!Number.isFinite(conversationId) || conversationId <= 0) { + return + } + if (typeof window === "undefined") return + + const recoverableTurns = turns.filter( + (turn) => turn.role === "user" && isOptimisticAnchorId(turn.anchor_id) + ) + + try { + const key = buildStorageKey(conversationId) + if (recoverableTurns.length === 0) { + localStorage.removeItem(key) + return + } + localStorage.setItem(key, JSON.stringify(recoverableTurns)) + } catch { + /* ignore */ + } +} + +export function clearRecoverableOptimisticTurns(conversationId: number): void { + if (!Number.isFinite(conversationId) || conversationId <= 0) { + return + } + if (typeof window === "undefined") return + + try { + localStorage.removeItem(buildStorageKey(conversationId)) + } catch { + /* ignore */ + } +}