Skip to content

Workflow as code v2 with TS and Python examples#8172

Draft
rubenfiszel wants to merge 11 commits intomainfrom
workflows-as-code-v2
Draft

Workflow as code v2 with TS and Python examples#8172
rubenfiszel wants to merge 11 commits intomainfrom
workflows-as-code-v2

Conversation

@rubenfiszel
Copy link
Contributor

@rubenfiszel rubenfiszel commented Mar 1, 2026

Summary

Workflow-as-code v2: checkpoint/replay orchestration with task(), step(), and workflow() primitives for both TypeScript and Python. The parent script truly suspends (releases its worker slot entirely) while child jobs execute, then resumes to collect results. Proven to work on a single worker without deadlocking.


Two Primitives: task() vs step()

task() step()
Execution Separate child job (own process) Inline in parent process
Overhead Full job lifecycle (queue, worker pickup, install, run) Just the function call + one DB write
Parallelism Yes — Promise.all/asyncio.gather runs children concurrently No — sequential only
Use case Heavy compute, external API calls, scripts that take seconds+ Lightweight deterministic operations: timestamps, random IDs, config reads
Why it exists Offload work, enable parallelism, isolate failures Solve the replay determinism problem without child job overhead

The Replay Determinism Problem

Glue code between task() calls re-executes on every replay. Non-deterministic operations produce different values each time:

TypeScript:

export default workflow(async (x: number) => {
  const a = await taskA(x);        // cached — runs once as child job
  const rand = Math.random();       // different on each replay!
  const b = await taskB(a + rand);  // gets different args each replay
});

Python:

@workflow
async def main(x: int):
    a = await task_a(x=x)           # cached
    rand = random.random()           # different on each replay!
    b = await task_b(x=a + rand)     # gets different args each replay

step() solves this by executing inline and checkpointing:

TypeScript:

export default workflow(async (x: number) => {
  const a = await taskA(x);
  const rand = await step("rand", () => Math.random());  // cached after first execution
  const b = await taskB(a + rand);                        // deterministic
});

Python:

@workflow
async def main(x: int):
    a = await task_a(x=x)
    rand = await step("rand", lambda: random.random())  # cached after first execution
    b = await task_b(x=a + rand)                         # deterministic

Design Decision: Why Not Accumulate?

We considered accumulating inline step results in memory and flushing them when the next task() dispatches. This avoids the extra process re-invocation but has a correctness flaw: if the process crashes between the inline step and the next persistence point, the inline result is lost. On retry, Math.random() produces a different value — defeating the purpose.

Instead, step() persists immediately: it throws StepSuspend/_StepSuspend, exits the process, Rust saves the result to the DB, and re-runs. This guarantees the cached value survives crashes.

Design Decision: Why Not a Child Job?

Making every step() a child job would be correct but wildly expensive for Date.now() (microseconds of compute, seconds of job overhead). step() runs the function in the parent's process — the only overhead is one DB write and one process re-invocation (~30ms with cached modules).


Language Support

TypeScript (Bun)

import { task, workflow, step } from "windmill-client";

const double = task(async function double(x: number) {
  return x * 2;
});

export default workflow(async (x: number = 7) => {
  const ts = await step("timestamp", () => Date.now());
  const doubled = await double(x);
  const id = await step("random_id", () => crypto.randomUUID());
  return { ts, doubled, id };
});
  • task(fn) wraps a function for child job dispatch
  • workflow(fn) marks the entry point
  • step(name, fn) executes inline with checkpoint
  • Parallel: Promise.all([taskA(), taskB()]) dispatches concurrent child jobs
  • Child mode: non-matching steps return never-resolving thenables to prevent Promise.all race conditions
  • globalThis.__wmill_wf_ctx bridges the dual module instance problem (wrapper and user script import separate copies of windmill-client)

Python

import asyncio
from wmill import workflow, task, step

@task
async def double(x: int):
    return x * 2

@workflow
async def main(x: int = 7):
    ts = await step("timestamp", lambda: time.time_ns())
    doubled = await double(x=x)
    rid = await step("random_id", lambda: str(uuid.uuid4()))
    return {"ts": ts, "doubled": doubled, "id": rid}
  • @task decorator marks functions for child job dispatch
  • @workflow decorator marks the entry point
  • step(name, fn) — async function for inline checkpoint (same semantics as TypeScript)
  • Parallel: asyncio.gather(task_a(...), task_b(...)) dispatches concurrent child jobs
  • Child mode: non-matching steps return never-resolving asyncio.Future() instances
  • Context is managed via contextvars.ContextVar (no global state needed — Python's contextvars are naturally scoped)

Python-specific design notes

  • The @task wrapper returns a coroutine synchronously (before await), so asyncio.gather() collects all pending dispatches before any coroutine starts running. When the first _suspend() coroutine executes, it collects all accumulated pending steps and raises _StepSuspend with mode "parallel".
  • _StepSuspend inherits from BaseException (not Exception) so it's never caught by bare except Exception: blocks in user code.
  • The Python wrapper calls wmill.client._run_workflow(fn, checkpoint, args) which returns a dict with type field ("complete", "dispatch", or "inline_checkpoint"), matching the TypeScript output format exactly. The Rust handle_wac_v2_output() function handles both languages identically.

End-to-End Execution Flow (TypeScript Example)

Step-by-step execution

1. Parent job starts (1st bun execution)

The Rust worker detects WAC v2 (contains workflow(, task(, windmill-client). It loads the checkpoint from v2_job_status.workflow_as_code_status._checkpoint — on first run, empty: {completed_steps: {}}.

The wrapper creates WorkflowCtx(checkpoint), calls setWorkflowCtx(ctx) (also sets globalThis.__wmill_wf_ctx for cross-module access), and runs the workflow.

When step("timestamp", () => Date.now()) is called:

  • ctx._allocKey() returns step_0
  • Not in completed_steps → execute Date.now() inline
  • Throw StepSuspend({mode: "inline_checkpoint", key: "step_0", result: 1772781806614})

Wrapper catches it, writes {type: "inline_checkpoint", key: "step_0", result: 1772781806614} to result.json. Bun exits.

2. Rust handles inline checkpoint

handle_wac_v2_output() sees InlineCheckpoint:

  1. Loads parent checkpoint, adds step_0 → 1772781806614 to completed_steps
  2. Saves checkpoint to DB
  3. Resets running = false on the job queue row — this makes the job immediately eligible for pickup without waiting for the zombie detector
  4. Returns Err(WacSuspended) — worker skips completion, job stays in queue

3. Parent re-runs (2nd bun execution)

Worker picks up the job immediately. Checkpoint now has {completed_steps: {step_0: 1772781806614}}.

step("timestamp", ...)step_0 found in cache → returns 1772781806614 (not re-executed!)

double(x)step_1 not in cache → StepSuspend with dispatch. Wrapper writes {type: "dispatch", steps: [{key: "step_1", ...}]}. Rust creates child job, suspends parent with suspend = 1.

4. Child executes (3rd bun execution — child job)

Child loads checkpoint with _executing_key: "step_1". Runs workflow:

  • step_0: in cache → replay
  • step_1 (double): matches _executingKey_execute_directly: true → runs x * 2 = 14 → throws StepSuspend({mode: "step_complete", result: 14})

Non-matching steps return never-resolving thenables to prevent Promise.all race conditions.

5. Result processor unsuspends parent

handle_wac_child_completion: adds step_1 → 14 to parent checkpoint, sets suspend = 0.

6. Parent re-runs (4th bun execution)

step_0: replay (1772781806614). step_1: replay (14). step("random_id", ...)step_2 not cached → executes, throws inline_checkpoint. Rust saves, re-runs.

7. Final run (5th bun execution)

All three steps replay from cache. Workflow returns {ts: 1772781806614, doubled: 14, id: "5rbt3fz326i"}.

Verified: the timestamp is 1772781806614 on ALL 5 runs — deterministic replay works.


Proof: True Suspension (Single Worker)

[06:24:51] parent: queued suspend=2 | children: 2 total, 0 done
[06:24:53] parent: queued suspend=1 | children: 2 total, 1 done
[06:24:55] parent: queued suspend=0 | children: 2 total, 2 done
[06:24:59] parent: queued suspend=1 | children: 3 total, 2 done
[06:25:01] parent: queued suspend=0 | children: 3 total, 3 done
[06:25:05] parent: completed success=true

Single worker, no deadlock. The parent truly releases its worker slot.


Comparison with Other Approaches

Approach Suspend model Determinism Extra infra
Temporal Long-running process, deterministic replay Built-in (workflow.random(), workflow.now()) Requires Temporal cluster
Inngest Checkpoint/replay via step.run() step.run() for everything Requires Inngest platform
Windmill WAC v1 Parent holds worker slot, polls for child None — glue code re-executes None
Windmill WAC v2 Parent suspends via DB suspend column step() for inline checkpoints None — uses existing DB

vs Temporal

Temporal requires all workflow code to be deterministic (no Date.now(), no random, no I/O in glue code). Side effects must use workflow.sideEffect(). Our approach is similar but opt-in: step() is the equivalent of workflow.sideEffect(), and task() is the equivalent of workflow.executeActivity(). Key difference: no separate cluster needed.

vs WAC v1

v1 kept the parent process alive (await waitJob(jobId)). With N workers and a workflow dispatching N+ parallel tasks, v1 deadlocks. v2 exits the parent process — the suspend column is the only state.


Key Abstractions

Rust Backend (Shared by TypeScript and Python)

WacCheckpoint (wac_executor.rs)

pub struct WacCheckpoint {
    pub completed_steps: Map<String, Value>,  // step_key → result
    pub pending_steps: Option<WacPendingSteps>, // dispatched task() steps
    pub input_args: Map<String, Value>,       // original workflow args
    pub _executing_key: Option<String>,       // child mode: which step to run
}

WacOutput (wac_executor.rs)

pub enum WacOutput {
    Complete { result: Value },
    Dispatch { mode: String, steps: Vec<WacStepDispatch> },
    InlineCheckpoint { key: String, result: Value },
}

Three possible outcomes from a process execution (bun or python):

  • Complete: workflow finished, return result
  • Dispatch: needs child jobs for task() calls
  • InlineCheckpoint: step() executed inline, persist and re-run

handle_wac_v2_output (bun_executor.rs — shared)

Routes the three output types. Called from both bun and python executors:

  • Complete → return result
  • Dispatch → create child jobs, seed checkpoints with _executing_key, suspend parent (UPDATE SET suspend = N), return WacSuspended
  • InlineCheckpoint → save result to checkpoint, reset running = false for immediate re-pickup, return WacSuspended

handle_wac_child_completion (result_processor.rs)

When a child completes: find step key, add result to parent checkpoint, decrement/clear suspend. On failure: fail parent immediately.

WacSuspended error (worker.rs)

Worker returns Ok(true) — don't complete the job. For dispatch, parent has suspend > 0 (hidden from workers). For inline, parent has running = false (immediately re-eligible).

TypeScript SDK (client.ts)

WorkflowCtx

  • stepIndex: shared counter for both task() and step() via _allocKey()
  • completed: cache of step_key → result
  • _executingKey: child mode — which step to execute directly
  • _nextStep(): routing for task() — cache hit, execute-directly, never-resolve, or accumulate-and-throw
  • _runInlineStep(): routing for step() — cache hit, never-resolve, or execute-and-throw

step(name, fn)

Execute inline, checkpoint result. Three paths:

  1. Cached → return immediately
  2. Child mode (non-matching) → never-resolving promise
  3. First execution → run fn(), throw StepSuspend({mode: "inline_checkpoint", key, result})

task(fn)

Wrap function for child job dispatch. In child mode with matching key: run fn(), throw StepSuspend({mode: "step_complete"}).

globalThis context sharing

Reflect.set(globalThis, "__wmill_wf_ctx", ctx) bridges the dual module instance problem (wrapper and user script import separate copies of windmill-client). Reflect.set/get survives tree-shaking by rolldown/tsdown.

Python SDK (client.py)

WorkflowCtx

  • _step_index: shared counter for both task() and step() via _alloc_key()
  • _completed: cache of step_key → result
  • _executing_key: child mode — which step to execute directly
  • _next_step(): routing for @task — cache hit, execute-directly, never-resolve, or accumulate-and-throw
  • _run_inline_step(): routing for step() — cache hit, never-resolve, or execute-and-throw

step(name, fn) (async function)

Same semantics as TypeScript. Three paths:

  1. Cached → return immediately
  2. Child mode (non-matching) → await asyncio.Future() (never resolves)
  3. First execution → run fn(), raise _StepSuspend({"mode": "inline_checkpoint", ...})

@task decorator

Marks async function for child job dispatch. The wrapper returns a coroutine synchronously, enabling asyncio.gather() to collect all pending tasks before any suspend. In child mode with matching key: calls function body directly, raises _StepSuspend({"mode": "step_complete"}).

Context management

Uses contextvars.ContextVar — no global state. The @workflow decorator is just a marker; _run_workflow() sets the context var before running and resets it in finally.


WAC v2 Detection

Both executors detect WAC v2 scripts by content analysis:

Language Detection criteria
TypeScript Contains workflow( AND task( AND windmill-client
Python Contains @workflow or workflow( AND @task or task( AND import wmill or from wmill

No entrypoint override (MAIN_OVERRIDE) — WAC scripts must use decorators.


Files Changed

Backend (Rust)

  • windmill-worker/src/wac_executor.rs — WacCheckpoint, WacOutput (with InlineCheckpoint), load/save/step helpers
  • windmill-worker/src/bun_executor.rs — WAC detection, JS wrapper, handle_wac_v2_output (pub, shared), bundle cache bypass
  • windmill-worker/src/python_executor.rs — WAC detection, Python wrapper, checkpoint.json, calls shared handle_wac_v2_output
  • windmill-worker/src/result_processor.rs — handle_wac_child_completion
  • windmill-worker/src/worker.rs — WacSuspended handling
  • windmill-common/src/error.rs — WacSuspended error variant

TypeScript SDK

  • typescript-client/client.ts — WorkflowCtx, StepSuspend, task(), step(), workflow()

Python SDK

  • python-client/wmill/wmill/client.py — WorkflowCtx, _StepSuspend, @task, @workflow, step(), _run_workflow()
  • python-client/wmill/tests/test_workflow.py — 17 tests covering dispatch, replay, parallel, conditional, inline checkpoint, child mode

Frontend

  • frontend/src/lib/components/graph/WacGraph.svelte — execution visualization

Replace ctx.step("name", "script") API with @task decorators where
functions are called directly. Users no longer need to pass WorkflowCtx
or use string-based step names/script paths.

Python: @task decorator with contextvars-based implicit context
TypeScript: task() wrapper with module-level context variable
Parsers: detect @task function calls instead of ctx.step() calls
Worker: updated wrappers to set implicit context

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Mar 1, 2026

Deploying windmill with  Cloudflare Pages  Cloudflare Pages

Latest commit: cac05f6
Status: ✅  Deploy successful!
Preview URL: https://aa77e729.windmill.pages.dev
Branch Preview URL: https://workflows-as-code-v2.windmill.pages.dev

View logs

rubenfiszel and others added 10 commits March 6, 2026 05:00
- Rust-side orchestration: parent dispatches child jobs, suspends, resumes on completion
- _executing_key in checkpoint tells child which step to execute directly
- task() throws StepSuspend(mode="step_complete") after executing target step
- result_processor handles child completion and updates parent checkpoint
- WacGraph.svelte for runtime execution visualization
- Sequential and parallel workflows tested end-to-end

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts:
#	backend/windmill-worker/src/lib.rs
#	frontend/src/lib/components/scriptEditor/LogPanel.svelte
#	python-client/wmill/wmill/client.py
#	typescript-client/package.json
- Disable bun bundle caching for WAC v2 scripts (wrapper needs
  windmill-client from node_modules, not available in bundle mode)
- Use Reflect.set/get(globalThis, "__wmill_wf_ctx") to share workflow
  context across dual module instances (wrapper vs user script)
- Never-resolving thenable for non-matching steps in child job mode
  prevents Promise.all race conditions
- Make description field optional in NewScript API (defaults to "")

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
step() executes a function inline (no child job) and persists the result
to the checkpoint. On replay, the cached value is returned — ensuring
deterministic behavior for non-deterministic operations like Date.now()
or Math.random().

- TypeScript: step(name, fn) — executes inline, throws StepSuspend with
  mode "inline_checkpoint" to persist before continuing
- Rust: InlineCheckpoint variant in WacOutput, saves to checkpoint and
  resets running=false for immediate re-pickup (no zombie wait)
- Shared step counter between task() and step() via _allocKey()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Python SDK: WorkflowCtx with _executing_key child mode, _alloc_key
  shared counter, _run_inline_step for step(), _execute_directly and
  _never_resolve for child mode, step() async function
- Python executor: WAC v2 detection, checkpoint.json writing, WAC
  wrapper.py generation calling _run_workflow(), post-execution hook
  into shared handle_wac_v2_output()
- Make handle_wac_v2_output pub so both bun and python executors share
  the same dispatch/suspend/inline-checkpoint logic
- 17 Python tests covering dispatch, replay, parallel, conditional,
  inline checkpoint, and child mode

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix type error: Tooltip doesn't accept text snippet, use Popover
- Extract shared helpers for task matching and block collection
- Replace linear tasks.find() with Map lookups
- Remove mutable module-level counter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
handle_python_job's async state machine was too large when combined
with handle_wac_v2_output. Box::pin heap-allocates the future.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The v2 @task decorator was shadowing the v1 one, breaking WAC v1
scripts that rely on HTTP-based dispatch via /workflow_as_code/ API.

The merged decorator handles three modes:
- v2: inside @workflow context → checkpoint/replay dispatch
- v1: WM_JOB_ID set, no @workflow → HTTP API dispatch + wait_job
- standalone: no Windmill env → execute function body directly

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant