Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/7134.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove without spawning option, needed for parentless tasks, has been reinstated.
11 changes: 9 additions & 2 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def _remove_matched_tasks(
ids: Set[TaskTokens],
flow_nums: 'FlowNums',
warn_unremovable: bool = True,
no_spawn: bool = False,
):
"""Remove matched tasks."""
# Mapping of *relative* task IDs to removed flow numbers:
Expand All @@ -183,6 +184,7 @@ def _remove_matched_tasks(
# Need to remove the task from the pool.
# Spawn next occurrence of xtrigger sequential task (otherwise
# this would not happen after removing this occurrence):
itask.no_spawn = no_spawn
schd.pool.check_spawn_psx_task(itask)
schd.pool.remove(itask, 'request')
to_kill.append(itask)
Expand Down Expand Up @@ -493,13 +495,17 @@ async def set_verbosity(schd: 'Scheduler', level: 'Enum'):

@_command('remove_tasks')
async def remove_tasks(
schd: 'Scheduler', tasks: Iterable[str], flow: List[str]
schd: 'Scheduler',
tasks: Iterable[str],
flow: List[str],
no_spawn: bool = False
):
"""Match and remove tasks (`cylc remove` command).

Args:
tasks: Relative IDs or globs to match.
flow: flows to remove the tasks from.
no_spawn: Do not spawn successors before removal.
"""
flow = back_compat_flow_all(flow) # BACK COMPAT (see func def)
ids = validate.is_tasks(tasks)
Expand All @@ -512,7 +518,8 @@ async def remove_tasks(
_remove_matched_tasks(
schd,
matched,
schd.pool.flow_mgr.cli_to_flow_nums(flow)
schd.pool.flow_mgr.cli_to_flow_nums(flow),
no_spawn=no_spawn,
)


Expand Down
15 changes: 15 additions & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,21 @@ class Arguments(TaskMutation.Arguments):
By default, tasks will be removed from all flows.
''')
)
no_spawn = Boolean(
default_value=False,
description=sstrip('''
"Do not spawn successors before removal."

This usually only applies to parentless tasks whose next
instance(s) are either spawned automatically to the runahead
limit or sequentially on xtrigger satisfaction.

If `false` the scheduler will spawn the next task instance
as normal (default).

If `true` the scheduler will not spawn the next task instance.
''')
)


class SetPrereqsAndOutputs(Mutation, TaskMutation):
Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/scripts/remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
$wFlows: [WorkflowID]!,
$tasks: [NamespaceIDGlob]!,
$flow: [Flow!],
$noSpawn: Boolean,
) {
remove (
workflows: $wFlows,
tasks: $tasks,
flow: $flow
flow: $flow,
noSpawn: $noSpawn
) {
result
}
Expand All @@ -91,7 +93,12 @@ def get_option_parser() -> COP:
multiworkflow=True,
argdoc=[FULL_ID_MULTI_ARG_DOC],
)

add_flow_opts_for_remove(parser)
parser.add_option(
"--no-spawn",
help="Do not spawn successors before removal.",
action="store_true", default=False, dest="no_spawn")
return parser


Expand All @@ -107,6 +114,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
for tokens in tokens_list
],
'flow': options.flow,
'noSpawn': options.no_spawn,
}
}

Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
# xtriggers are no longer relevant -> remove them
self.xtrigger_mgr.force_satisfy_all(itask, log=False)

if itask.state.is_runahead and itask.flow_nums:
if itask.state.is_runahead and itask.flow_nums and not itask.no_spawn:
# If removing a parentless runahead-limited task
# auto-spawn its next instance first.
self.spawn_if_parentless(
Expand Down Expand Up @@ -2436,6 +2436,7 @@ def check_spawn_psx_task(self, itask: 'TaskProxy') -> None:
itask.identity not in
self.xtrigger_mgr.sequential_has_spawned_next
)
and not itask.no_spawn
):
self.xtrigger_mgr.sequential_has_spawned_next.add(
itask.identity
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class TaskProxy:
.removed:
A flag to indicate this task has been removed by command (used
e.g. to disable failed/submit-failed event handlers).
.no_spawn:
A flag to indicate whether this task should spawn a successor
(usually only applies to parentless tasks).

Args:
tdef: The definition object of this task.
Expand Down Expand Up @@ -212,6 +215,7 @@ class TaskProxy:
'transient',
'is_xtrigger_sequential',
'removed',
'no_spawn',
)

def __init__(
Expand Down Expand Up @@ -288,6 +292,7 @@ def __init__(
self.is_late = is_late
self.waiting_on_job_prep = False
self.removed: bool = False
self.no_spawn: bool = False

self.state = TaskState(tdef, self.point, status, is_held)

Expand Down
44 changes: 44 additions & 0 deletions tests/integration/test_remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,3 +545,47 @@ async def test_remove_triggered(flow, scheduler, start):
)
assert not schd.pool.get_tasks()
assert not schd.pool.tasks_to_trigger_now


async def test_remove_no_spawn(flow, scheduler, start):
"""Test the no-spawn removal of parentless tasks."""
schd: Scheduler = scheduler(
flow({
'scheduler': {
'cycle point format': 'CCYY',
},
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P0',
'graph': {
'R3//P1Y': '''
@wall_clock => a
b
c
''',
},
},
})
)
async with start(schd):
assert schd.pool.get_task_ids() == {
'2000/a',
'2000/b',
'2000/c',
'2001/b',
'2001/c',
}

# normal removal of parentless runahead and sequential xtrigger
# spawned tasks
await run_cmd(remove_tasks(schd, ['2000/a', '2000/b', '2001/b'], []))
assert schd.pool.get_task_ids() == {
'2000/c',
'2001/a',
'2001/c',
'2002/b',
}

# no spawn removal
await run_cmd(remove_tasks(schd, ['2001/a', '2001/c'], [], True))
assert schd.pool.get_task_ids() == {'2000/c', '2002/b'}
Loading