Skip to content

Add TaskReadinessGate plugin extension point#7158

Open
robsyme wants to merge 19 commits into
masterfrom
feature/task-readiness-gate
Open

Add TaskReadinessGate plugin extension point#7158
robsyme wants to merge 19 commits into
masterfrom
feature/task-readiness-gate

Conversation

@robsyme
Copy link
Copy Markdown
Collaborator

@robsyme robsyme commented May 19, 2026

Summary

Implements the TaskReadinessGate plugin extension point described in ADR adr/20260516-task-readiness-gate.md (#7151), using the blocking-prepare design that emerged from review of that ADR.

Plugins implement void prepare(TaskHandler) throws InterruptedException. The method may block freely; core runs each registered gate on a managed virtual-thread executor and polls the resulting Future from canSubmit. Throwing routes through the existing errorStrategy machinery in submitPendingTasks. An executor.gateMaxWait config option (default 24h) bounds wait time as a safety net for stuck gates.

Behavior is bit-identical when no plugin registers a gate.

Why this shape

The ADR originally proposed a polling boolean isReady(TaskRun) contract that required plugin authors to manage their own async state and "return promptly." Paolo's review of the ADR (#7151) suggested a blocking prepare with core owning the async runtime — smaller SPI, fewer footguns for plugin authors, no AbstractAsyncReadinessGate helper needed. This PR implements that variant.

What's in the PR

  • nextflow.processor.TaskReadinessGate — new SPI interface (org.pf4j extension).
  • TaskPollingMonitor — three integration points:
    • start(): resolves gates via Plugins.getExtensions(TaskReadinessGate) and lazily creates a virtual-thread executor (no overhead when no gates registered).
    • schedule(): submits each gate's prepare(handler) to the managed executor immediately on enqueue, so async work starts before any executor slot frees up.
    • canSubmit(): AND-chain reordered to check allGatesReady first; futures polled via isDone, exceptions unwrapped, peer futures cancelled on failure, gateMaxWait enforced.
    • evict(): cancels in-flight gate futures so background work doesn't outlive the task.
  • ExecutorConfig.gateMaxWait — new Duration config option, default 24h (covers all Glacier Standard/Bulk restore tiers; Deep Archive Bulk users need to bump).
  • Spock tests covering: no-gates zero-cost path, happy path, ProcessException identity preservation, RuntimeException wrapping (including ProcessRetryableException marker routing through error.cause), checked-exception wrapping, external cancellation, multi-gate AND semantics, fail-fast peer cancellation, gateMaxWait timeout, eviction cancellation.
  • Developer docs (docs/developer/task-readiness-gate.md) and config reference entry.

Notable side-effect change

TaskPollingMonitor's lock/condition fields (pendingLock, taskCompleteLock, taskAvail, slotAvail, taskComplete) are now initialised in the constructor rather than start(). The previous shape left these null until start() was called, which made the new gate-submission code in schedule() unsafe to exercise from property-style construction (used by Spock tests). The change is safe because Executor.init → createTaskMonitor → start is the only production path and lock initialisation has no dependency on start's side effects.

What's out of scope

  • The first consumer plugin (Glacier restore for AWS Batch tasks) lives in a separate repository and will adopt this SPI in a follow-up.
  • AbstractAsyncReadinessGate helper class — not needed under this design (gates write blocking code directly).
  • Per-process gate scoping — covered by the existing hints directive; plugins inspect handler.task.config.hints and short-circuit. No new directive required.
  • Gate ordering / priority — gates run in parallel; order across gates is unspecified.

Test plan

  • CI: full :nextflow:test green
  • TaskPollingMonitorTest, ParallelPollingMonitorTest, ExecutorConfigTest, TaskPollingMonitorReadinessGateTest (new) all pass locally
  • Verified locally with ./gradlew :nextflow:test --tests 'nextflow.processor.*' --tests 'nextflow.executor.ExecutorConfigTest'
  • No behavioral change when no plugin registers a gate

Links

🤖 Generated with Claude Code

robsyme added 19 commits May 19, 2026 16:53
…n routing

TaskProcessor.resumeOrDie inspects error.cause for ProcessRetryableException
and CloudSpotTerminationException markers. Rethrowing a RuntimeException
implementing ProcessRetryableException as-is would lose retry routing because
the marker would end up as `error`, not `error.cause`. Restore the original
plan's wrap-all-non-ProcessException semantics and add tests covering retry
marker routing and peer-future cancellation.

Also fix a Groovy for-loop closure-capture issue in schedule() that caused
multi-gate submissions to invoke the wrong gate, surfaced by the new
peer-cancellation test.

Signed-off-by: Rob Syme <[email protected]>
@robsyme robsyme requested a review from a team as a code owner May 19, 2026 21:52
Copy link
Copy Markdown
Member

@pditommaso pditommaso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up. The shape is close, but I'd like three changes before we merge.

1. Move gate orchestration into its own manager class

TaskPollingMonitor is already ~930 lines of subtle concurrency, and this PR adds ~80 lines spread across new fields (readinessGates, gateExecutor, gateStates, gateMaxWait), the GateState inner class, the allGatesReady method, plus touch points in start, schedule, canSubmit, evict, and a constructor reshuffle.

I'd like the gate concern extracted into a TaskGateManager (or similar) that owns the gates list, the virtual-thread executor, the future map, and the timeout policy. The monitor diff then collapses to ~4 one-liners: construct the manager in start(), manager.submit(handler) in schedule, manager.isReady(handler) in canSubmit, manager.evict(handler) in evict. The constructor lock-init change should then stand or fall on its own merit, separate from the gate work.

2. Replace executor.gateMaxWait with a hints entry — let the plugin own the timeout

Drop gateMaxWait from core entirely. The plugin (e.g. Glacier) reads its own per-process hint — hints 'glacier/maxWait': '5h' — and enforces its deadline itself, throwing through prepare. This keeps the core SPI minimal (the original prepare(handler) interface only), puts timeout policy where it's actually understood, and gives natural per-process scoping. A plugin that ignores both interrupts and its own deadline can hang slots — but a plugin that ignores interrupts already breaks eviction, so the core safety net was always partial.

3. If any gateMaxWait-style knob survives, default it to ~5 min, not 24h

24h is functionally "no limit" — a stuck gate burns a full day of slot before surfacing. A 5-min default forces realistic waits (Glacier Standard ~5h, Deep Archive Bulk ~48h) to be explicit and opt in, which is more discoverable and fails fast in tests/dev. Moot if #2 lands and the executor-level knob is removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants