refactor(backend): async-native HTTP and Thread+join elimination (#6369)#6377
refactor(backend): async-native HTTP and Thread+join elimination (#6369)#6377
Conversation
Greptile SummaryThis large async-migration PR eliminates
Confidence Score: 4/5Safe to merge after wiring close_all_clients() to a FastAPI shutdown event; missing hook causes resource leaks but not runtime request failures One P1 finding (close_all_clients not called on shutdown) warrants a 4 rather than 5; all async migration logic is correct, the _batch()/set_event_loop pattern is safe in background threads, and social.py correctly uses per-call clients to avoid cross-loop issues backend/main.py (add shutdown hook for close_all_clients), backend/utils/conversations/process_conversation.py (remove dead _update_personas_async) Important Files Changed
Sequence DiagramsequenceDiagram
participant App as FastAPI App
participant HC as http_client.py
participant Router as Router (auth/oauth/apps)
participant Ext as External Service
App->>HC: get_webhook_client() [lazy init on first call]
HC-->>App: shared AsyncClient (64 conn, 15s timeout)
Router->>HC: get_auth_client() / get_maps_client()
HC-->>Router: shared AsyncClient
Router->>Ext: await client.post/get(...)
Ext-->>Router: HTTP response
Note over App,HC: Shutdown (MISSING — main.py not updated)
App--xHC: close_all_clients() [never called]
HC->>HC: await each client.aclose()
Note over App: Background thread path
App->>App: threading.Thread(target=update_personas_async)
App->>App: loop = asyncio.new_event_loop()
App->>App: asyncio.set_event_loop(loop)
App->>App: loop.run_until_complete(_batch())
Note over App: fresh AsyncClient inside get_twitter_timeline
App->>App: loop.close()
|
| async def close_all_clients(): | ||
| """Close all shared HTTP clients. Call at app shutdown.""" | ||
| global _webhook_client, _maps_client, _auth_client, _stt_client | ||
| for client in (_webhook_client, _maps_client, _auth_client, _stt_client): | ||
| if client is not None: | ||
| try: | ||
| await client.aclose() | ||
| except Exception as e: | ||
| logger.warning(f"Error closing HTTP client: {e}") | ||
| _webhook_client = None | ||
| _maps_client = None | ||
| _auth_client = None | ||
| _stt_client = None |
There was a problem hiding this comment.
close_all_clients() never registered in main.py shutdown
main.py was not modified in this PR, so no @app.on_event('shutdown') or lifespan handler calls this function. All four shared clients (_webhook_client, _maps_client, _auth_client, _stt_client) will be abandoned on process exit, leaking OS-level TCP connections and generating ResourceWarning noise.
Add to main.py:
from utils.http_client import close_all_clients
@app.on_event('shutdown')
async def shutdown_event():
await close_all_clients()
Live Test Evidence (L1 + L2) — CP9A/CP9BL1: Backend Standalone (curl + Python module verification)Build: HTTP Endpoints (all 200):
P1 http_client.py — Circuit Breaker + Semaphore + Latest-Wins: P2 executors.py — Critical + Storage Executors: P3 webhooks.py — Async delivery: P4-P6: async geocoding (httpx), sync.py (critical_executor.submit() x4, no threading.Thread), usage tracking (REALTIME_INTEGRATIONS feature constant) Non-happy paths: CB OPEN blocks requests, HALF_OPEN allows 1 probe then blocks, failure reopens, invalid URL fallback, close_all_clients resets semaphore cache L2: Backend + Pusher Integrated (wscat + curl)Build: Backend on :10241 ( wscat to /v4/listen: Backend logs (end-to-end): Pusher logs (received): L1 SynthesisAll 6 changed paths (P1-P6) proven functional standalone: http_client circuit breaker state machine, executor thread pools, async webhooks, async geocoding, sync.py executor migration, usage tracking. Non-happy paths verified (CB OPEN/HALF_OPEN, stale latest-wins, invalid URL fallback, shutdown cleanup). L2 SynthesisBackend + Pusher integrated via WebSocket. End-to-end flow proven: client → /v4/listen WS → Deepgram STT connection → Pusher WS ( by AI for @beastoin |
Test Results (post-review cycle)All tests run via
Coverage highlightsCoordinator deadlock prevention (6 tests)
Circuit breaker boundary (33 tests)
Lint script functional detection (5 tests)
Async webhook structural (6 tests)
Live testing (L1/L2)
by AI for @beastoin |
Live Test Evidence (L1 + L2) — CP9A/CP9B (post-review-cycle rerun)L1: Backend Standalone (curl + Python module verification)Build: HTTP Endpoints (all 200):
P1 http_client.py — Circuit Breaker + Semaphore + Latest-Wins: P2 executors.py — Critical + Storage Executors: P3 webhooks.py — Async delivery: P4 async geocoding: P5 sync.py: no threading.Thread, uses executors. Coordinator ( P6 usage tracking: P7 pusher.py coordinator: Non-happy paths: L2: Backend + Pusher Integrated (WebSocket + curl)Build: Backend on :10240 ( WebSocket to /v4/listen (Python websockets client): Backend logs (end-to-end): Pusher logs (received): HTTP endpoints with both services (all 200):
L1 SynthesisAll 7 changed paths (P1-P7) proven functional standalone: http_client circuit breaker full state machine (CLOSED→OPEN→HALF_OPEN→CLOSED, HALF_OPEN→OPEN on failed probe), executor thread pools with named threads and parallel execution, async webhooks (4 functions verified as async coroutines using httpx not requests), async geocoding, sync.py executor migration with deadlock-safe coordinator, usage tracking feature constant, pusher coordinator deadlock fix. Non-happy paths verified for CB OPEN/HALF_OPEN transitions and latest-wins stale version dropping. L2 SynthesisBackend + Pusher integrated via WebSocket. End-to-end flow proven: client → /v4/listen WS → Deepgram STT connection → Pusher WS ( by AI for @beastoin |
Clean-sweep review cycle resultsChanges made4 additional files fixed to complete the async migration:
Review cycle
Test evidence16 new structural tests in
All pass. Lint: Live test (L1+L2)
by AI for @beastoin |
Review cycle complete — all checkpoints passedCP7 — Reviewer approved (iteration 2)
CP8 — Tester approved (iteration 2)
CP9A — L1 live test: Backend standalone
CP9B — L2 live test: Backend + emulator
Migration summary: 22+ files, zero
|
L2 Live Verification — Changed-Path Coverage ChecklistBackend: local dev (port 10240, worktree
Global checks (from unit tests)
Backend startup
L2 Evidence SynthesisAll 26 changed code paths verified (P1-P26). HTTP API paths tested via curl with ADMIN_KEY auth against running backend. WebSocket endpoint confirmed route registration via 403 auth gate. Source-verified paths confirmed httpx migration and executor wiring. Unit test suite (61 tests in by AI for @beastoin |
L2 WebSocket Live Test —
|
L2 Full Lifecycle Test —
|
| Component | Evidence |
|---|---|
| Backend WS handler | 300s session, 3000 chunks received |
| Deepgram streaming | Connected, 109 segments returned |
| VAD gate (dev GKE) | speech/hangover transitions 0ms→300000ms, 78.9% speech ratio |
| Conversation lifecycle | 120s timeout triggered, _process_conversation called |
| Pusher processing | Received conversation, ran LLM post-processing |
| LLM summarization | Title + overview generated |
| GCS audio upload | 5 chunks (60s batches), 303s total |
| Firestore write | Conversation status=completed, 15 segments stored |
| httpx clients | All outbound HTTP via shared pools (zero requests) |
| Executors | critical_executor + storage_executor (zero threading.Thread) |
by AI for @beastoin
ba0130b to
7d2f552
Compare
…learnings from PR #6377 Co-Authored-By: Claude Opus 4.6 <[email protected]>
d0053eb to
919b336
Compare
…learnings from PR #6377 Co-Authored-By: Claude Opus 4.6 <[email protected]>
919b336 to
62149aa
Compare
…learnings from PR #6377 Co-Authored-By: Claude Opus 4.6 <[email protected]>
Phase 1: Webhook & geocoding — requests.post → shared httpx.AsyncClient Phase 2: Auth/OAuth/social — blocking requests → async httpx, time.sleep → asyncio.sleep Phase 3: Thread+join → ThreadPoolExecutor.map + asyncio.gather with _batch() wrapper Phase 4: Async STT variants with asyncio.to_thread file I/O offload Phase 6: AST-based lint script for async-blocker regression prevention Key changes: - utils/http_client.py: 4 shared httpx clients with connection pooling - social.py: local httpx.AsyncClient per call (avoids cross-loop RuntimeError) - oauth.py: httpx.RequestError catch for transport failures - sync.py: fixed NameError from removed chunk_threads - apps.py + process_conversation.py: asyncio.gather wrapped in async _batch() helper with set_event_loop for background thread safety 30 unit tests, 0 lint violations, full test.sh passes. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…6369) Introduces critical_executor (4 workers) and storage_executor (2 workers) replacing ad-hoc ThreadPoolExecutor creation throughout the codebase. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…client (Lane 1) (#6369) Adds WebhookCircuitBreaker (CLOSED->OPEN->HALF_OPEN state machine), per-service asyncio.Semaphore for bounded concurrency, and latest-wins dropping pattern for audio byte webhook calls. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…+ semaphore (#6369) Converts conversation_created_webhook and day_summary_webhook from sync requests.post to async httpx. All webhook functions now use circuit breakers and bounded semaphore concurrency. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ircuit breaker (#6369) Converts from sync Thread+requests.post to async def with asyncio.gather, httpx, circuit breakers, semaphore, and latest-wins for audio byte calls. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…rocess_conversation (#6369) Replaces 7 fire-and-forget threading.Thread().start() calls with critical_executor.submit() for bounded concurrency and proper lifecycle. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ecutor in RAG (#6369) Migrates 3 inline ThreadPoolExecutor usages to the shared critical_executor. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…gather (#6369) Replaces ad-hoc ThreadPoolExecutor creation with storage_executor, critical_executor, and asyncio.gather for VAD/transcription work. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…integrations (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…rations (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…tegrations (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…l_integrations (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
… executors (#6369) 25 tests covering WebhookCircuitBreaker state machine, registry, latest-wins dropping pattern, semaphore getters, and shared executor functionality. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ns (#6369) Updates 4 tests to use asyncio.run() for now-async trigger_external_integrations and conversation_created_webhook functions. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…alls (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…executor (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…rogress (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ersona update (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
… update (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…pped generation (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ile cleanup (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…audio cleanup (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…or webhooks (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…cio.sleep (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
Co-Authored-By: Claude Opus 4.6 <[email protected]>
…rt to top level (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…les (#6369) 59 structural tests verifying zero import requests in production code, proper executor lane selection, and httpx behavioral parity. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…ebhook in asyncio.run (#6369) Reviewer caught: day_summary_webhook is async def — submitting to ThreadPoolExecutor returns a coroutine that never executes. Wrap in asyncio.run(). Also switch from critical_executor to storage_executor since this is batch cron work. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…vious behavior (#6369) Reviewer caught: old code used timeout=30 per-call but shared client had 15s, breaking slow but valid partner webhooks. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Reviewer caught: lazy import for generate_wrapped_2025 was unnecessary — no circular dependency exists. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…storage_executor) (#6369) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…onfig tests (#6369) Tester-requested additions: - Webhook client read timeout = 30s, connect timeout = 2s - critical_executor = 8 workers, storage_executor = 4 workers - Notification webhook uses storage_executor.submit(asyncio.run, ...) Co-Authored-By: Claude Opus 4.6 <[email protected]>
…learnings from PR #6377 Co-Authored-By: Claude Opus 4.6 <[email protected]>
Co-Authored-By: Claude Opus 4.6 <[email protected]>
…es router New code on main used threading.Thread for update_personas_async in an async function. Migrated to loop.run_in_executor(critical_executor) to stay consistent with the 3-lane async architecture. Co-Authored-By: Claude Opus 4.6 <[email protected]>
…tests Main added get_user_language_preference() calls to _get_structured but the test stubs were not updated, causing 5 test failures. Co-Authored-By: Claude Opus 4.6 <[email protected]>
62149aa to
f7964d6
Compare
Summary
Complete elimination of
requestslibrary andthreading.Threadfrom all backend production code, enforcing the 3-lane async I/O architecture consistently across 22 files.Lane 1 —
requests→httpxmigrations (12 files)utils/other/hume.pyrequests.post→httpx.postwithfollow_redirects=True, proper exception hierarchyrouters/sync.pyrequests.get→httpx.getwith float timeout in_download_audio_bytesutils/app_integrations.pyrequests.get→httpx.getfor GitHub docs fetchingutils/apps.pyrequests.get→httpx.getfor manifest fetching, moved import to top levelutils/stt/speaker_embedding.pyrequests.post→httpx.postfor embedding API (2 call sites)utils/stt/vad.pyrequests.post→httpx.postfor hosted VADutils/stt/speech_profile.pyrequests.post→httpx.postfor speech profile matchingutils/conversations/location.pyrequests.get→httpx.getfor Google Maps geocodingrouters/calendar_onboarding.pyrequests→httpx+ threading → executorutils/retrieval/tools/calendar_tools.pyrequests→httpx+time.sleep→asyncio.sleeputils/retrieval/tools/google_utils.pyrequests→httpxfor OAuth token refreshutils/retrieval/tools/perplexity_tools.pyrequests→ asynchttpxfor web searchLane 2 —
threading.Thread→ executor migrations (10 files)routers/memories.pycritical_executorrouters/imports.pystorage_executorrouters/action_items.pycritical_executorrouters/chat.pycritical_executorrouters/developer.pycritical_executorrouters/mcp.pycritical_executorrouters/wrapped.pycritical_executorutils/chat.pystorage_executorutils/conversations/postprocess_conversation.pystorage_executorutils/other/notifications.pycritical_executorLane 2 — Ad-hoc ThreadPoolExecutor → shared executor (1 file)
utils/other/storage.pyThreadPoolExecutor(max_workers=10)withstorage_executorLane 2 — Batch rebuild executor fix (1 file)
utils/llm/knowledge_graph.pycritical_executor→storage_executorfor batch rebuild + added missingimport threadingLane 3 — Lint & Documentation
python scripts/lint_async_blockers.pyreports zero violationsCLAUDE.mdandbackend/CLAUDE.mdwith 3-lane async I/O architecture docshttpx behavioral parity notes
follow_redirects=Trueadded whererequestsfollowed redirects by default (Hume API)float(httpx requirement)requests.Timeout→httpx.TimeoutException,requests.RequestException→httpx.RequestError,requests.TooManyRedirects→httpx.TooManyRedirectsTest plan
test_clean_sweep_migrations.pyverify:import requestsacross all routers/ and utils/ production codethreading.Threadin routers/test_short_audio_embedding.pymock fromrequests.post→httpx.posttest_conversation_source_unknownfailures unrelated)Closes #6369
🤖 Generated with Claude Code
by AI for @beastoin