Skip to content

clusterd: Unified Timely runtime for compute and storage#35851

Draft
antiguru wants to merge 9 commits intoMaterializeInc:mainfrom
antiguru:worktree-sunny-puzzling-unicorn
Draft

clusterd: Unified Timely runtime for compute and storage#35851
antiguru wants to merge 9 commits intoMaterializeInc:mainfrom
antiguru:worktree-sunny-puzzling-unicorn

Conversation

@antiguru
Copy link
Copy Markdown
Member

@antiguru antiguru commented Apr 2, 2026

Summary

EXPERIMENTAL — This PR is a proof of concept. See "Known Limitations" below.

  • Add a unified Timely runtime that runs both compute and storage dataflows in a single Timely cluster, sharing worker threads and communication infrastructure
  • Enable it by default via --unified-runtime (can be disabled with --unified-runtime=false)
  • Compute's existing logging dataflows automatically capture storage Timely/Differential events — no schema changes, no ID collisions

Motivation

Storage has no introspection today — its Timely events are discarded. Forwarding events via mpsc channels doesn't work because both Timely instances allocate IDs starting from 0, making it impossible to disambiguate origins. A unified runtime eliminates this by having a single ID allocator.

Approach

  • Refactor compute Worker to expose step methods and make key types pub (CommandReceiver, ResponseSender, ActiveComputeState)
  • Move storage's handle_internal_storage_command to StorageState with an explicit &mut TimelyWorker parameter
  • New unified.rs module in clusterd: sets up networking once, creates two sets of client channels, runs a merged event loop
  • Non-blocking storage reconnection via a channel adapter that buffers commands until InitializationComplete, then delivers reconciliation batches
  • Command distribution unchanged: compute uses Exchange-based broadcast, storage uses internal command sequencer
  • Controllers connect via separate gRPC connections as before

See doc/developer/design/20260402_unified_timely_runtime.md for the full design.

Known Limitations

Deterministic dataflow ordering. Timely requires that all workers render dataflows in the same deterministic order. In this prototype, compute and storage dataflows are rendered through independent command paths (compute via Exchange-based broadcast, storage via its internal command sequencer), and there is no canonical ordering between them. On multi-worker clusters, this can lead to workers rendering dataflows in different orders, which Timely does not tolerate. Fixing this properly likely requires unifying the command protocols at the protocol level, which is a significantly larger effort. For this reason, this PR should be considered experimental and is not safe for production use with multi-worker replicas.

Test plan

  • cargo check passes for mz-clusterd, mz-compute, mz-storage
  • Storage dataflows visible in mz_introspection.mz_dataflows
  • Storage operators visible in mz_introspection.mz_dataflow_operators
  • Full SLT/testdrive suite (pending — experimental)
  • Multi-worker correctness (blocked on deterministic ordering)

🤖 Generated with Claude Code

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 2, 2026

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

antiguru and others added 4 commits April 3, 2026 19:17
Add an experimental `--unified-runtime` flag to clusterd that runs both
compute and storage dataflows within a single Timely runtime, sharing
worker threads and communication infrastructure.

The key motivation is storage introspection: with a unified runtime,
compute's existing logging dataflows (mz_scheduling_elapsed,
mz_arrangement_sizes, etc.) automatically capture events from storage
dataflows with no ID collisions and no schema changes. A separate
approach of forwarding events via mpsc channels was rejected because
both Timely instances allocate IDs starting from 0, making it impossible
to disambiguate event origins.

Implementation approach:
- Refactor compute Worker to expose step methods and make key types pub
  (CommandReceiver, ResponseSender, ActiveComputeState, etc.)
- Move storage's handle_internal_storage_command to StorageState with an
  explicit TimelyWorker parameter, so it can be called without the
  storage Worker wrapper
- Add unified.rs module in clusterd that sets up networking once,
  creates two sets of client channels, and runs a merged event loop
- Command distribution is unchanged: compute uses its Exchange-based
  broadcast, storage uses its internal command sequencer
- Controllers connect via separate gRPC connections as before

The unified loop owns the single TimelyWorker reference and alternates
between compute and storage step functions each iteration. The flag
defaults to off; both code paths coexist.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Turn on the unified Timely runtime as the default behavior. This can be
disabled with `--unified-runtime=false` if needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
The compute protocol sends Hello commands to all workers via
split_command, but the command_channel dataflow asserts that non-zero
workers only receive UpdateConfiguration. In standalone mode,
ClusterClient intercepts Hello and converts it to a connection setup
(creating new per-worker channels) before anything reaches the command
channel.

Fix the unified runtime to use ClusterClient + TimelyContainer instead
of raw Partitioned<LocalClient>, so Hello is properly intercepted.

Also make TimelyContainer fields pub and add a worker_threads field so
that unified containers sharing the same worker guards can both provide
thread handles for unparking.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Replace the blocking storage reconnection path with an async channel
adapter (spawn_storage_channel_adapter) that mirrors compute's pattern:

1. Receives new client connections from ClusterClient asynchronously
2. Buffers commands until InitializationComplete, collecting them
3. Delivers the reconciliation batch to the worker through a channel
4. Forwards ongoing commands and responses through persistent channels

The worker never blocks waiting for a new storage client — it checks
for reconciliation batches with try_recv each iteration.

Also extract StorageState::reconcile_with_commands from Worker::reconcile
so the reconciliation logic can be called without the Worker wrapper
(and without the blocking drain loop).

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
@antiguru antiguru force-pushed the worktree-sunny-puzzling-unicorn branch from 4801693 to 015133a Compare April 3, 2026 17:19
antiguru and others added 5 commits April 3, 2026 19:43
During reconciliation, objects may be dropped before the filtered
command set is applied. AllowCompaction commands for those dropped
objects would hit a soft_assert in handle_storage_command. Filter them
out by checking against expected_objects before applying.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
AllowCompaction may arrive for objects that don't exist on this cluster,
either because they were already dropped by a prior AllowCompaction with
an empty frontier, or because the controller sent the command to a
cluster that never had the object. Replace the soft_assert_or_log with
a silent skip to avoid panics in debug builds.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
When the storage controller reconnects, the channel adapter delivers a
new reconciliation batch. But the command channel may still contain
commands from the previous connection that haven't been consumed yet.
These stale commands can reference objects that no longer exist after
reconciliation, causing AllowCompaction panics.

Fix by draining the storage command channel before applying the
reconciliation batch.

Also revert the soft_assert_or_log removal for AllowCompaction — the
assertion should be preserved to catch real bugs; the stale command
drain is the proper fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
During reconciliation, RunIngestion commands may be filtered out (when
the object is immediately dropped). Subsequent AllowCompaction commands
for the same object's subsources can arrive through the ongoing command
path (after InitializationComplete), referencing objects whose
RunIngestion was never applied. Handle this gracefully by skipping
AllowCompaction for objects not present in the storage state.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Revert to opt-in (--unified-runtime) to avoid breaking CI. The unified
runtime has known issues with AllowCompaction ordering during storage
reconciliation that need to be resolved before it can be enabled by
default.

Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant