Add TaskReadinessGate plugin extension point#7158
Conversation
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
…path Signed-off-by: Rob Syme <[email protected]>
…n routing TaskProcessor.resumeOrDie inspects error.cause for ProcessRetryableException and CloudSpotTerminationException markers. Rethrowing a RuntimeException implementing ProcessRetryableException as-is would lose retry routing because the marker would end up as `error`, not `error.cause`. Restore the original plan's wrap-all-non-ProcessException semantics and add tests covering retry marker routing and peer-future cancellation. Also fix a Groovy for-loop closure-capture issue in schedule() that caused multi-gate submissions to invoke the wrong gate, surfaced by the new peer-cancellation test. Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
…canSubmit reorder Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
Signed-off-by: Rob Syme <[email protected]>
pditommaso
left a comment
There was a problem hiding this comment.
Thanks for picking this up. The shape is close, but I'd like three changes before we merge.
1. Move gate orchestration into its own manager class
TaskPollingMonitor is already ~930 lines of subtle concurrency, and this PR adds ~80 lines spread across new fields (readinessGates, gateExecutor, gateStates, gateMaxWait), the GateState inner class, the allGatesReady method, plus touch points in start, schedule, canSubmit, evict, and a constructor reshuffle.
I'd like the gate concern extracted into a TaskGateManager (or similar) that owns the gates list, the virtual-thread executor, the future map, and the timeout policy. The monitor diff then collapses to ~4 one-liners: construct the manager in start(), manager.submit(handler) in schedule, manager.isReady(handler) in canSubmit, manager.evict(handler) in evict. The constructor lock-init change should then stand or fall on its own merit, separate from the gate work.
2. Replace executor.gateMaxWait with a hints entry — let the plugin own the timeout
Drop gateMaxWait from core entirely. The plugin (e.g. Glacier) reads its own per-process hint — hints 'glacier/maxWait': '5h' — and enforces its deadline itself, throwing through prepare. This keeps the core SPI minimal (the original prepare(handler) interface only), puts timeout policy where it's actually understood, and gives natural per-process scoping. A plugin that ignores both interrupts and its own deadline can hang slots — but a plugin that ignores interrupts already breaks eviction, so the core safety net was always partial.
3. If any gateMaxWait-style knob survives, default it to ~5 min, not 24h
24h is functionally "no limit" — a stuck gate burns a full day of slot before surfacing. A 5-min default forces realistic waits (Glacier Standard ~5h, Deep Archive Bulk ~48h) to be explicit and opt in, which is more discoverable and fails fast in tests/dev. Moot if #2 lands and the executor-level knob is removed.
Summary
Implements the
TaskReadinessGateplugin extension point described in ADRadr/20260516-task-readiness-gate.md(#7151), using the blocking-preparedesign that emerged from review of that ADR.Plugins implement
void prepare(TaskHandler) throws InterruptedException. The method may block freely; core runs each registered gate on a managed virtual-thread executor and polls the resultingFuturefromcanSubmit. Throwing routes through the existingerrorStrategymachinery insubmitPendingTasks. Anexecutor.gateMaxWaitconfig option (default24h) bounds wait time as a safety net for stuck gates.Behavior is bit-identical when no plugin registers a gate.
Why this shape
The ADR originally proposed a polling
boolean isReady(TaskRun)contract that required plugin authors to manage their own async state and "return promptly." Paolo's review of the ADR (#7151) suggested a blockingpreparewith core owning the async runtime — smaller SPI, fewer footguns for plugin authors, noAbstractAsyncReadinessGatehelper needed. This PR implements that variant.What's in the PR
nextflow.processor.TaskReadinessGate— new SPI interface (org.pf4j extension).TaskPollingMonitor— three integration points:start(): resolves gates viaPlugins.getExtensions(TaskReadinessGate)and lazily creates a virtual-thread executor (no overhead when no gates registered).schedule(): submits each gate'sprepare(handler)to the managed executor immediately on enqueue, so async work starts before any executor slot frees up.canSubmit(): AND-chain reordered to checkallGatesReadyfirst; futures polled viaisDone, exceptions unwrapped, peer futures cancelled on failure,gateMaxWaitenforced.evict(): cancels in-flight gate futures so background work doesn't outlive the task.ExecutorConfig.gateMaxWait— newDurationconfig option, default24h(covers all Glacier Standard/Bulk restore tiers; Deep Archive Bulk users need to bump).ProcessRetryableExceptionmarker routing througherror.cause), checked-exception wrapping, external cancellation, multi-gate AND semantics, fail-fast peer cancellation,gateMaxWaittimeout, eviction cancellation.docs/developer/task-readiness-gate.md) and config reference entry.Notable side-effect change
TaskPollingMonitor's lock/condition fields (pendingLock,taskCompleteLock,taskAvail,slotAvail,taskComplete) are now initialised in the constructor rather thanstart(). The previous shape left thesenulluntilstart()was called, which made the new gate-submission code inschedule()unsafe to exercise from property-style construction (used by Spock tests). The change is safe becauseExecutor.init → createTaskMonitor → startis the only production path and lock initialisation has no dependency onstart's side effects.What's out of scope
AbstractAsyncReadinessGatehelper class — not needed under this design (gates write blocking code directly).hintsdirective; plugins inspecthandler.task.config.hintsand short-circuit. No new directive required.Test plan
:nextflow:testgreenTaskPollingMonitorTest,ParallelPollingMonitorTest,ExecutorConfigTest,TaskPollingMonitorReadinessGateTest(new) all pass locally./gradlew :nextflow:test --tests 'nextflow.processor.*' --tests 'nextflow.executor.ExecutorConfigTest'Links
adr/20260516-task-readiness-gate.md(merged as Add ADR for TaskReadinessGate plugin extension point #7151)🤖 Generated with Claude Code