@@ -321,6 +321,21 @@ fn run_unified_worker<A: Allocate + 'static>(
321321
322322 let compute_metrics = config. compute_metrics . for_worker ( worker_id) ;
323323
324+ // --- Set up storage channel adapter ---
325+ // The adapter handles reconnections: it buffers commands until
326+ // InitializationComplete, then delivers the batch for reconciliation.
327+ // Ongoing commands and responses flow through persistent channels.
328+ let ( storage_reconcile_tx, storage_reconcile_rx) = std:: sync:: mpsc:: channel ( ) ;
329+ let ( storage_cmd_tx, storage_cmd_rx) = std:: sync:: mpsc:: channel ( ) ;
330+ let ( storage_resp_tx, storage_resp_rx) = mpsc:: unbounded_channel ( ) ;
331+ spawn_storage_channel_adapter (
332+ storage_client_rx,
333+ storage_reconcile_tx,
334+ storage_cmd_tx,
335+ storage_resp_rx,
336+ thread:: current ( ) ,
337+ ) ;
338+
324339 // --- Set up storage state ---
325340 let ( internal_cmd_tx, internal_cmd_rx) =
326341 internal_control:: setup_command_sequencer ( timely_worker) ;
@@ -400,83 +415,42 @@ fn run_unified_worker<A: Allocate + 'static>(
400415 . expect_err ( "change to first nonce" ) ;
401416 compute_worker. set_nonce ( nonce) ;
402417
403- // Wait for first storage client connection.
404- let mut storage_client_rx = storage_client_rx;
405- let Some ( ( _nonce, mut storage_cmd_rx, storage_resp_tx) ) = storage_client_rx. blocking_recv ( )
406- else {
407- return ;
408- } ;
409-
410- // Main unified loop: alternates between compute and storage steps.
411- // On compute nonce change, reconcile compute and continue.
412- // On storage channel disconnect, wait for a new storage client.
418+ // Main unified loop.
419+ // Compute reconnection: handled by spawn_channel_adapter + nonce mechanism.
420+ // Storage reconnection: handled by spawn_storage_channel_adapter which
421+ // delivers reconciliation batches through storage_reconcile_rx.
413422 loop {
414- let result = run_unified_client (
423+ let Err ( NonceChange ( nonce ) ) = run_unified_loop (
415424 & mut compute_worker,
416425 & mut storage_state,
417- & mut storage_cmd_rx,
426+ & storage_reconcile_rx,
427+ & storage_cmd_rx,
418428 & storage_resp_tx,
419429 ) ;
420- match result {
421- UnifiedLoopExit :: ComputeNonceChange ( nonce) => {
422- compute_worker. set_nonce ( nonce) ;
423- }
424- UnifiedLoopExit :: StorageDisconnect => {
425- // Wait for a new storage client connection.
426- let Some ( ( _nonce, new_cmd_rx, new_resp_tx) ) = storage_client_rx. blocking_recv ( )
427- else {
428- return ;
429- } ;
430- storage_cmd_rx = new_cmd_rx;
431- // We need to restart the loop with the new storage channels.
432- // The run_unified_client function will pick up the new channels.
433- let _ = new_resp_tx; // Will be used in next iteration
434- // TODO: Handle storage reconnection properly
435- continue ;
436- }
437- }
430+ compute_worker. set_nonce ( nonce) ;
438431 }
439432}
440433
441- enum UnifiedLoopExit {
442- ComputeNonceChange ( Uuid ) ,
443- StorageDisconnect ,
444- }
445-
446- fn run_unified_client < A : Allocate + ' static > (
434+ fn run_unified_loop < A : Allocate + ' static > (
447435 compute : & mut ComputeWorker < ' _ , A > ,
448436 storage_state : & mut StorageState ,
449- storage_cmd_rx : & mut mpsc:: UnboundedReceiver < StorageCommand > ,
437+ storage_reconcile_rx : & std:: sync:: mpsc:: Receiver < Vec < StorageCommand > > ,
438+ storage_cmd_rx : & std:: sync:: mpsc:: Receiver < StorageCommand > ,
450439 storage_resp_tx : & mpsc:: UnboundedSender < StorageResponse > ,
451- ) -> UnifiedLoopExit {
440+ ) -> Result < std :: convert :: Infallible , NonceChange > {
452441 // Reconcile compute.
453- if let Err ( NonceChange ( nonce) ) = compute. reconcile ( ) {
454- return UnifiedLoopExit :: ComputeNonceChange ( nonce) ;
455- }
456-
457- // Reconcile storage (drain until InitializationComplete).
458- {
459- let mut commands = vec ! [ ] ;
460- loop {
461- match storage_cmd_rx. blocking_recv ( ) {
462- Some ( StorageCommand :: InitializationComplete ) => break ,
463- Some ( command) => commands. push ( command) ,
464- None => return UnifiedLoopExit :: StorageDisconnect ,
465- }
466- }
467- // Apply reconciled commands.
468- // TODO: Full storage reconciliation logic (matching Worker::reconcile).
469- // For now, apply commands directly.
470- for command in commands {
471- storage_state. handle_storage_command ( command) ;
472- }
473- }
442+ compute. reconcile ( ) ?;
474443
475444 let mut last_compute_maintenance = Instant :: now ( ) ;
476445 let mut last_storage_maintenance = Instant :: now ( ) ;
477446 let mut last_stats_time = Instant :: now ( ) ;
478447
479448 loop {
449+ // --- Check for storage reconnection ---
450+ if let Ok ( commands) = storage_reconcile_rx. try_recv ( ) {
451+ storage_state. reconcile_with_commands ( commands) ;
452+ }
453+
480454 // --- Compute maintenance ---
481455 let compute_maintenance_interval = compute
482456 . compute_state
@@ -521,8 +495,9 @@ fn run_unified_client<A: Allocate + 'static>(
521495 . statistics_collection_interval ;
522496 let stats_remaining = stats_interval. saturating_sub ( last_stats_time. elapsed ( ) ) ;
523497
498+ // The storage channel adapter unparks the worker thread when it sends
499+ // commands, so we don't need to check storage_cmd_rx here.
524500 let can_park = compute. command_rx . is_empty ( )
525- && storage_cmd_rx. is_empty ( )
526501 && storage_state. async_worker . is_empty ( ) ;
527502
528503 let sleep = min_durations ( & [ compute_sleep, storage_sleep, Some ( stats_remaining) ] ) ;
@@ -537,9 +512,7 @@ fn run_unified_client<A: Allocate + 'static>(
537512 timer. observe_duration ( ) ;
538513
539514 // --- Compute step ---
540- if let Err ( NonceChange ( nonce) ) = compute. handle_pending_commands ( ) {
541- return UnifiedLoopExit :: ComputeNonceChange ( nonce) ;
542- }
515+ compute. handle_pending_commands ( ) ?;
543516 if let Some ( mut cs) = compute. activate_compute ( ) {
544517 cs. process_peeks ( ) ;
545518 cs. process_subscribes ( ) ;
@@ -565,20 +538,8 @@ fn run_unified_client<A: Allocate + 'static>(
565538 }
566539
567540 // Drain storage commands
568- let mut storage_disconnected = false ;
569- loop {
570- match storage_cmd_rx. try_recv ( ) {
571- Ok ( cmd) => storage_state. handle_storage_command ( cmd) ,
572- Err ( mpsc:: error:: TryRecvError :: Empty ) => break ,
573- Err ( mpsc:: error:: TryRecvError :: Disconnected ) => {
574- storage_disconnected = true ;
575- break ;
576- }
577- }
578- }
579-
580- if storage_disconnected {
581- return UnifiedLoopExit :: StorageDisconnect ;
541+ while let Ok ( cmd) = storage_cmd_rx. try_recv ( ) {
542+ storage_state. handle_storage_command ( cmd) ;
582543 }
583544
584545 // Drain async worker responses
@@ -593,6 +554,72 @@ fn run_unified_client<A: Allocate + 'static>(
593554 }
594555}
595556
557+ /// Spawn a task that bridges [`ClusterClient`] connections to persistent worker channels.
558+ ///
559+ /// For each new storage client connection:
560+ /// 1. Drains commands until `InitializationComplete`, collecting them into a batch
561+ /// 2. Sends the batch through `reconcile_tx` for the worker to reconcile in one go
562+ /// 3. Forwards ongoing commands through `command_tx`
563+ /// 4. Forwards worker responses from `response_rx` back to the active client
564+ ///
565+ /// This ensures the worker thread never blocks waiting for a new client connection.
566+ fn spawn_storage_channel_adapter (
567+ mut client_rx : mpsc:: UnboundedReceiver < (
568+ Uuid ,
569+ mpsc:: UnboundedReceiver < StorageCommand > ,
570+ mpsc:: UnboundedSender < StorageResponse > ,
571+ ) > ,
572+ reconcile_tx : std:: sync:: mpsc:: Sender < Vec < StorageCommand > > ,
573+ command_tx : std:: sync:: mpsc:: Sender < StorageCommand > ,
574+ mut response_rx : mpsc:: UnboundedReceiver < StorageResponse > ,
575+ worker_thread : thread:: Thread ,
576+ ) {
577+ mz_ore:: task:: spawn (
578+ || "storage-channel-adapter" ,
579+ async move {
580+ while let Some ( ( _nonce, mut cmd_rx, resp_tx) ) = client_rx. recv ( ) . await {
581+ // Phase 1: Buffer commands until InitializationComplete.
582+ let mut commands = vec ! [ ] ;
583+ let init_complete = loop {
584+ match cmd_rx. recv ( ) . await {
585+ Some ( StorageCommand :: InitializationComplete ) => break true ,
586+ Some ( cmd) => commands. push ( cmd) ,
587+ None => break false ,
588+ }
589+ } ;
590+ if !init_complete {
591+ continue ;
592+ }
593+
594+ // Deliver reconciliation batch to the worker.
595+ if reconcile_tx. send ( commands) . is_err ( ) {
596+ break ;
597+ }
598+ worker_thread. unpark ( ) ;
599+
600+ // Phase 2: Forward ongoing commands and responses.
601+ loop {
602+ tokio:: select! {
603+ cmd = cmd_rx. recv( ) => match cmd {
604+ Some ( cmd) => {
605+ if command_tx. send( cmd) . is_err( ) {
606+ return ;
607+ }
608+ worker_thread. unpark( ) ;
609+ }
610+ None => break , // Client disconnected, wait for next.
611+ } ,
612+ resp = response_rx. recv( ) => match resp {
613+ Some ( resp) => { let _ = resp_tx. send( resp) ; }
614+ None => return , // Worker gone.
615+ } ,
616+ }
617+ }
618+ }
619+ } ,
620+ ) ;
621+ }
622+
596623/// Returns the minimum of a set of optional durations.
597624fn min_durations ( durations : & [ Option < Duration > ] ) -> Option < Duration > {
598625 durations. iter ( ) . copied ( ) . flatten ( ) . min ( )
0 commit comments