Workflow as code v2 with TS and Python examples#8172
Draft
rubenfiszel wants to merge 11 commits intomainfrom
Draft
Workflow as code v2 with TS and Python examples#8172rubenfiszel wants to merge 11 commits intomainfrom
rubenfiszel wants to merge 11 commits intomainfrom
Conversation
Replace ctx.step("name", "script") API with @task decorators where
functions are called directly. Users no longer need to pass WorkflowCtx
or use string-based step names/script paths.
Python: @task decorator with contextvars-based implicit context
TypeScript: task() wrapper with module-level context variable
Parsers: detect @task function calls instead of ctx.step() calls
Worker: updated wrappers to set implicit context
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Deploying windmill with
|
| Latest commit: |
cac05f6
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://aa77e729.windmill.pages.dev |
| Branch Preview URL: | https://workflows-as-code-v2.windmill.pages.dev |
- Rust-side orchestration: parent dispatches child jobs, suspends, resumes on completion - _executing_key in checkpoint tells child which step to execute directly - task() throws StepSuspend(mode="step_complete") after executing target step - result_processor handles child completion and updates parent checkpoint - WacGraph.svelte for runtime execution visualization - Sequential and parallel workflows tested end-to-end Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts: # backend/windmill-worker/src/lib.rs # frontend/src/lib/components/scriptEditor/LogPanel.svelte # python-client/wmill/wmill/client.py # typescript-client/package.json
- Disable bun bundle caching for WAC v2 scripts (wrapper needs windmill-client from node_modules, not available in bundle mode) - Use Reflect.set/get(globalThis, "__wmill_wf_ctx") to share workflow context across dual module instances (wrapper vs user script) - Never-resolving thenable for non-matching steps in child job mode prevents Promise.all race conditions - Make description field optional in NewScript API (defaults to "") Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
step() executes a function inline (no child job) and persists the result to the checkpoint. On replay, the cached value is returned — ensuring deterministic behavior for non-deterministic operations like Date.now() or Math.random(). - TypeScript: step(name, fn) — executes inline, throws StepSuspend with mode "inline_checkpoint" to persist before continuing - Rust: InlineCheckpoint variant in WacOutput, saves to checkpoint and resets running=false for immediate re-pickup (no zombie wait) - Shared step counter between task() and step() via _allocKey() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Python SDK: WorkflowCtx with _executing_key child mode, _alloc_key shared counter, _run_inline_step for step(), _execute_directly and _never_resolve for child mode, step() async function - Python executor: WAC v2 detection, checkpoint.json writing, WAC wrapper.py generation calling _run_workflow(), post-execution hook into shared handle_wac_v2_output() - Make handle_wac_v2_output pub so both bun and python executors share the same dispatch/suspend/inline-checkpoint logic - 17 Python tests covering dispatch, replay, parallel, conditional, inline checkpoint, and child mode Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix type error: Tooltip doesn't accept text snippet, use Popover - Extract shared helpers for task matching and block collection - Replace linear tasks.find() with Map lookups - Remove mutable module-level counter Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
handle_python_job's async state machine was too large when combined with handle_wac_v2_output. Box::pin heap-allocates the future. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The v2 @task decorator was shadowing the v1 one, breaking WAC v1 scripts that rely on HTTP-based dispatch via /workflow_as_code/ API. The merged decorator handles three modes: - v2: inside @workflow context → checkpoint/replay dispatch - v1: WM_JOB_ID set, no @workflow → HTTP API dispatch + wait_job - standalone: no Windmill env → execute function body directly Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Workflow-as-code v2: checkpoint/replay orchestration with
task(),step(), andworkflow()primitives for both TypeScript and Python. The parent script truly suspends (releases its worker slot entirely) while child jobs execute, then resumes to collect results. Proven to work on a single worker without deadlocking.Two Primitives:
task()vsstep()task()step()Promise.all/asyncio.gatherruns children concurrentlyThe Replay Determinism Problem
Glue code between
task()calls re-executes on every replay. Non-deterministic operations produce different values each time:TypeScript:
Python:
step()solves this by executing inline and checkpointing:TypeScript:
Python:
Design Decision: Why Not Accumulate?
We considered accumulating inline step results in memory and flushing them when the next
task()dispatches. This avoids the extra process re-invocation but has a correctness flaw: if the process crashes between the inline step and the next persistence point, the inline result is lost. On retry,Math.random()produces a different value — defeating the purpose.Instead,
step()persists immediately: it throwsStepSuspend/_StepSuspend, exits the process, Rust saves the result to the DB, and re-runs. This guarantees the cached value survives crashes.Design Decision: Why Not a Child Job?
Making every
step()a child job would be correct but wildly expensive forDate.now()(microseconds of compute, seconds of job overhead).step()runs the function in the parent's process — the only overhead is one DB write and one process re-invocation (~30ms with cached modules).Language Support
TypeScript (Bun)
task(fn)wraps a function for child job dispatchworkflow(fn)marks the entry pointstep(name, fn)executes inline with checkpointPromise.all([taskA(), taskB()])dispatches concurrent child jobsPromise.allrace conditionsglobalThis.__wmill_wf_ctxbridges the dual module instance problem (wrapper and user script import separate copies of windmill-client)Python
@taskdecorator marks functions for child job dispatch@workflowdecorator marks the entry pointstep(name, fn)— async function for inline checkpoint (same semantics as TypeScript)asyncio.gather(task_a(...), task_b(...))dispatches concurrent child jobsasyncio.Future()instancescontextvars.ContextVar(no global state needed — Python's contextvars are naturally scoped)Python-specific design notes
@taskwrapper returns a coroutine synchronously (beforeawait), soasyncio.gather()collects all pending dispatches before any coroutine starts running. When the first_suspend()coroutine executes, it collects all accumulated pending steps and raises_StepSuspendwith mode"parallel"._StepSuspendinherits fromBaseException(notException) so it's never caught by bareexcept Exception:blocks in user code.wmill.client._run_workflow(fn, checkpoint, args)which returns a dict withtypefield ("complete","dispatch", or"inline_checkpoint"), matching the TypeScript output format exactly. The Rusthandle_wac_v2_output()function handles both languages identically.End-to-End Execution Flow (TypeScript Example)
Step-by-step execution
1. Parent job starts (1st bun execution)
The Rust worker detects WAC v2 (contains
workflow(,task(,windmill-client). It loads the checkpoint fromv2_job_status.workflow_as_code_status._checkpoint— on first run, empty:{completed_steps: {}}.The wrapper creates
WorkflowCtx(checkpoint), callssetWorkflowCtx(ctx)(also setsglobalThis.__wmill_wf_ctxfor cross-module access), and runs the workflow.When
step("timestamp", () => Date.now())is called:ctx._allocKey()returnsstep_0completed_steps→ executeDate.now()inlineStepSuspend({mode: "inline_checkpoint", key: "step_0", result: 1772781806614})Wrapper catches it, writes
{type: "inline_checkpoint", key: "step_0", result: 1772781806614}to result.json. Bun exits.2. Rust handles inline checkpoint
handle_wac_v2_output()seesInlineCheckpoint:step_0 → 1772781806614tocompleted_stepsrunning = falseon the job queue row — this makes the job immediately eligible for pickup without waiting for the zombie detectorErr(WacSuspended)— worker skips completion, job stays in queue3. Parent re-runs (2nd bun execution)
Worker picks up the job immediately. Checkpoint now has
{completed_steps: {step_0: 1772781806614}}.step("timestamp", ...)→step_0found in cache → returns1772781806614(not re-executed!)double(x)→step_1not in cache →StepSuspendwith dispatch. Wrapper writes{type: "dispatch", steps: [{key: "step_1", ...}]}. Rust creates child job, suspends parent withsuspend = 1.4. Child executes (3rd bun execution — child job)
Child loads checkpoint with
_executing_key: "step_1". Runs workflow:step_0: in cache → replaystep_1(double): matches_executingKey→_execute_directly: true→ runsx * 2 = 14→ throwsStepSuspend({mode: "step_complete", result: 14})Non-matching steps return never-resolving thenables to prevent
Promise.allrace conditions.5. Result processor unsuspends parent
handle_wac_child_completion: addsstep_1 → 14to parent checkpoint, setssuspend = 0.6. Parent re-runs (4th bun execution)
step_0: replay (1772781806614).step_1: replay (14).step("random_id", ...)→step_2not cached → executes, throws inline_checkpoint. Rust saves, re-runs.7. Final run (5th bun execution)
All three steps replay from cache. Workflow returns
{ts: 1772781806614, doubled: 14, id: "5rbt3fz326i"}.Verified: the timestamp is
1772781806614on ALL 5 runs — deterministic replay works.Proof: True Suspension (Single Worker)
Single worker, no deadlock. The parent truly releases its worker slot.
Comparison with Other Approaches
workflow.random(),workflow.now())step.run()step.run()for everythingsuspendcolumnstep()for inline checkpointsvs Temporal
Temporal requires all workflow code to be deterministic (no
Date.now(), no random, no I/O in glue code). Side effects must useworkflow.sideEffect(). Our approach is similar but opt-in:step()is the equivalent ofworkflow.sideEffect(), andtask()is the equivalent ofworkflow.executeActivity(). Key difference: no separate cluster needed.vs WAC v1
v1 kept the parent process alive (
await waitJob(jobId)). With N workers and a workflow dispatching N+ parallel tasks, v1 deadlocks. v2 exits the parent process — thesuspendcolumn is the only state.Key Abstractions
Rust Backend (Shared by TypeScript and Python)
WacCheckpoint(wac_executor.rs)WacOutput(wac_executor.rs)Three possible outcomes from a process execution (bun or python):
task()callsstep()executed inline, persist and re-runhandle_wac_v2_output(bun_executor.rs— shared)Routes the three output types. Called from both bun and python executors:
_executing_key, suspend parent (UPDATE SET suspend = N), returnWacSuspendedrunning = falsefor immediate re-pickup, returnWacSuspendedhandle_wac_child_completion(result_processor.rs)When a child completes: find step key, add result to parent checkpoint, decrement/clear suspend. On failure: fail parent immediately.
WacSuspendederror (worker.rs)Worker returns
Ok(true)— don't complete the job. For dispatch, parent hassuspend > 0(hidden from workers). For inline, parent hasrunning = false(immediately re-eligible).TypeScript SDK (
client.ts)WorkflowCtxstepIndex: shared counter for bothtask()andstep()via_allocKey()completed: cache ofstep_key → result_executingKey: child mode — which step to execute directly_nextStep(): routing fortask()— cache hit, execute-directly, never-resolve, or accumulate-and-throw_runInlineStep(): routing forstep()— cache hit, never-resolve, or execute-and-throwstep(name, fn)Execute inline, checkpoint result. Three paths:
fn(), throwStepSuspend({mode: "inline_checkpoint", key, result})task(fn)Wrap function for child job dispatch. In child mode with matching key: run
fn(), throwStepSuspend({mode: "step_complete"}).globalThiscontext sharingReflect.set(globalThis, "__wmill_wf_ctx", ctx)bridges the dual module instance problem (wrapper and user script import separate copies of windmill-client).Reflect.set/getsurvives tree-shaking by rolldown/tsdown.Python SDK (
client.py)WorkflowCtx_step_index: shared counter for bothtask()andstep()via_alloc_key()_completed: cache ofstep_key → result_executing_key: child mode — which step to execute directly_next_step(): routing for@task— cache hit, execute-directly, never-resolve, or accumulate-and-throw_run_inline_step(): routing forstep()— cache hit, never-resolve, or execute-and-throwstep(name, fn)(async function)Same semantics as TypeScript. Three paths:
await asyncio.Future()(never resolves)fn(), raise_StepSuspend({"mode": "inline_checkpoint", ...})@taskdecoratorMarks async function for child job dispatch. The wrapper returns a coroutine synchronously, enabling
asyncio.gather()to collect all pending tasks before any suspend. In child mode with matching key: calls function body directly, raises_StepSuspend({"mode": "step_complete"}).Context management
Uses
contextvars.ContextVar— no global state. The@workflowdecorator is just a marker;_run_workflow()sets the context var before running and resets it infinally.WAC v2 Detection
Both executors detect WAC v2 scripts by content analysis:
workflow(ANDtask(ANDwindmill-client@workfloworworkflow(AND@taskortask(ANDimport wmillorfrom wmillNo entrypoint override (
MAIN_OVERRIDE) — WAC scripts must use decorators.Files Changed
Backend (Rust)
windmill-worker/src/wac_executor.rs— WacCheckpoint, WacOutput (with InlineCheckpoint), load/save/step helperswindmill-worker/src/bun_executor.rs— WAC detection, JS wrapper,handle_wac_v2_output(pub, shared), bundle cache bypasswindmill-worker/src/python_executor.rs— WAC detection, Python wrapper, checkpoint.json, calls sharedhandle_wac_v2_outputwindmill-worker/src/result_processor.rs— handle_wac_child_completionwindmill-worker/src/worker.rs— WacSuspended handlingwindmill-common/src/error.rs— WacSuspended error variantTypeScript SDK
typescript-client/client.ts— WorkflowCtx, StepSuspend, task(), step(), workflow()Python SDK
python-client/wmill/wmill/client.py— WorkflowCtx, _StepSuspend, @task, @workflow, step(), _run_workflow()python-client/wmill/tests/test_workflow.py— 17 tests covering dispatch, replay, parallel, conditional, inline checkpoint, child modeFrontend
frontend/src/lib/components/graph/WacGraph.svelte— execution visualization