clusterd: Unified Timely runtime for compute and storage#35851
Draft
antiguru wants to merge 9 commits intoMaterializeInc:mainfrom
Draft
clusterd: Unified Timely runtime for compute and storage#35851antiguru wants to merge 9 commits intoMaterializeInc:mainfrom
antiguru wants to merge 9 commits intoMaterializeInc:mainfrom
Conversation
Contributor
|
Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone. PR title guidelines
Pre-merge checklist
|
Add an experimental `--unified-runtime` flag to clusterd that runs both compute and storage dataflows within a single Timely runtime, sharing worker threads and communication infrastructure. The key motivation is storage introspection: with a unified runtime, compute's existing logging dataflows (mz_scheduling_elapsed, mz_arrangement_sizes, etc.) automatically capture events from storage dataflows with no ID collisions and no schema changes. A separate approach of forwarding events via mpsc channels was rejected because both Timely instances allocate IDs starting from 0, making it impossible to disambiguate event origins. Implementation approach: - Refactor compute Worker to expose step methods and make key types pub (CommandReceiver, ResponseSender, ActiveComputeState, etc.) - Move storage's handle_internal_storage_command to StorageState with an explicit TimelyWorker parameter, so it can be called without the storage Worker wrapper - Add unified.rs module in clusterd that sets up networking once, creates two sets of client channels, and runs a merged event loop - Command distribution is unchanged: compute uses its Exchange-based broadcast, storage uses its internal command sequencer - Controllers connect via separate gRPC connections as before The unified loop owns the single TimelyWorker reference and alternates between compute and storage step functions each iteration. The flag defaults to off; both code paths coexist. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Turn on the unified Timely runtime as the default behavior. This can be disabled with `--unified-runtime=false` if needed. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
The compute protocol sends Hello commands to all workers via split_command, but the command_channel dataflow asserts that non-zero workers only receive UpdateConfiguration. In standalone mode, ClusterClient intercepts Hello and converts it to a connection setup (creating new per-worker channels) before anything reaches the command channel. Fix the unified runtime to use ClusterClient + TimelyContainer instead of raw Partitioned<LocalClient>, so Hello is properly intercepted. Also make TimelyContainer fields pub and add a worker_threads field so that unified containers sharing the same worker guards can both provide thread handles for unparking. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Replace the blocking storage reconnection path with an async channel adapter (spawn_storage_channel_adapter) that mirrors compute's pattern: 1. Receives new client connections from ClusterClient asynchronously 2. Buffers commands until InitializationComplete, collecting them 3. Delivers the reconciliation batch to the worker through a channel 4. Forwards ongoing commands and responses through persistent channels The worker never blocks waiting for a new storage client — it checks for reconciliation batches with try_recv each iteration. Also extract StorageState::reconcile_with_commands from Worker::reconcile so the reconciliation logic can be called without the Worker wrapper (and without the blocking drain loop). Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
4801693 to
015133a
Compare
During reconciliation, objects may be dropped before the filtered command set is applied. AllowCompaction commands for those dropped objects would hit a soft_assert in handle_storage_command. Filter them out by checking against expected_objects before applying. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
AllowCompaction may arrive for objects that don't exist on this cluster, either because they were already dropped by a prior AllowCompaction with an empty frontier, or because the controller sent the command to a cluster that never had the object. Replace the soft_assert_or_log with a silent skip to avoid panics in debug builds. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
When the storage controller reconnects, the channel adapter delivers a new reconciliation batch. But the command channel may still contain commands from the previous connection that haven't been consumed yet. These stale commands can reference objects that no longer exist after reconciliation, causing AllowCompaction panics. Fix by draining the storage command channel before applying the reconciliation batch. Also revert the soft_assert_or_log removal for AllowCompaction — the assertion should be preserved to catch real bugs; the stale command drain is the proper fix. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
During reconciliation, RunIngestion commands may be filtered out (when the object is immediately dropped). Subsequent AllowCompaction commands for the same object's subsources can arrive through the ongoing command path (after InitializationComplete), referencing objects whose RunIngestion was never applied. Handle this gracefully by skipping AllowCompaction for objects not present in the storage state. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Revert to opt-in (--unified-runtime) to avoid breaking CI. The unified runtime has known issues with AllowCompaction ordering during storage reconciliation that need to be resolved before it can be enabled by default. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
EXPERIMENTAL — This PR is a proof of concept. See "Known Limitations" below.
--unified-runtime(can be disabled with--unified-runtime=false)Motivation
Storage has no introspection today — its Timely events are discarded. Forwarding events via mpsc channels doesn't work because both Timely instances allocate IDs starting from 0, making it impossible to disambiguate origins. A unified runtime eliminates this by having a single ID allocator.
Approach
Workerto expose step methods and make key types pub (CommandReceiver,ResponseSender,ActiveComputeState)handle_internal_storage_commandtoStorageStatewith an explicit&mut TimelyWorkerparameterunified.rsmodule in clusterd: sets up networking once, creates two sets of client channels, runs a merged event loopInitializationComplete, then delivers reconciliation batchesSee
doc/developer/design/20260402_unified_timely_runtime.mdfor the full design.Known Limitations
Deterministic dataflow ordering. Timely requires that all workers render dataflows in the same deterministic order. In this prototype, compute and storage dataflows are rendered through independent command paths (compute via Exchange-based broadcast, storage via its internal command sequencer), and there is no canonical ordering between them. On multi-worker clusters, this can lead to workers rendering dataflows in different orders, which Timely does not tolerate. Fixing this properly likely requires unifying the command protocols at the protocol level, which is a significantly larger effort. For this reason, this PR should be considered experimental and is not safe for production use with multi-worker replicas.
Test plan
cargo checkpasses for mz-clusterd, mz-compute, mz-storagemz_introspection.mz_dataflowsmz_introspection.mz_dataflow_operators🤖 Generated with Claude Code