Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ def run(self, graph, config, updatehash=False):
"""
Executes a pre-defined pipeline using distributed approaches
"""
# Ensure only current run errors are reported
self._run_errors = []

logger.info("Running in parallel.")
self._config = config
poll_sleep_secs = float(config["execution"]["poll_sleep_duration"])
Expand All @@ -136,7 +139,6 @@ def run(self, graph, config, updatehash=False):
self.mapnodesubids = {}
# setup polling - TODO: change to threaded model
notrun = []
errors = []

old_progress_stats = None
old_presub_stats = None
Expand Down Expand Up @@ -171,14 +173,14 @@ def run(self, graph, config, updatehash=False):
result = self._get_result(taskid)
except Exception as exc:
notrun.append(self._clean_queue(jobid, graph))
errors.append(exc)
self._run_errors.append(exc)
else:
if result:
if result["traceback"]:
notrun.append(
self._clean_queue(jobid, graph, result=result)
)
errors.append("".join(result["traceback"]))
self._run_errors.append("".join(result["traceback"]))
else:
self._task_finished_cb(jobid)
self._remove_node_dirs()
Expand Down Expand Up @@ -210,18 +212,19 @@ def run(self, graph, config, updatehash=False):
# close any open resources
self._postrun_check()

if errors:
# If one or more nodes failed, re-rise first of them
error, cause = errors[0], None
if isinstance(error, str):
if self._run_errors:
# If one or more nodes failed, re-raise first of them
error, cause = self._run_errors[0], None
if isinstance(
error, (str, list)
): # Error can also be a list of strings (traceback)
error = RuntimeError(error)

if len(errors) > 1:
if len(self._run_errors) > 1:
error, cause = (
RuntimeError(f"{len(errors)} raised. Re-raising first."),
RuntimeError(f"{len(self._run_errors)} raised. Re-raising first."),
error,
)

raise error from cause

def _get_result(self, taskid):
Expand Down
1 change: 1 addition & 0 deletions nipype/pipeline/plugins/legacymultiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self.procs[jobid].run(updatehash=updatehash)
except Exception:
traceback = format_exception(*sys.exc_info())
self._run_errors.append(traceback)
self._clean_queue(
jobid, graph, result={"result": None, "traceback": traceback}
)
Expand Down
2 changes: 2 additions & 0 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
free_gpu_slots,
self.n_gpu_procs,
)

if self._stats != stats:
tasks_list_msg = ""

Expand Down Expand Up @@ -389,6 +390,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self.procs[jobid].run(updatehash=updatehash)
except Exception:
traceback = format_exception(*sys.exc_info())
self._run_errors.append(traceback)
self._clean_queue(
jobid, graph, result={"result": None, "traceback": traceback}
)
Expand Down
24 changes: 24 additions & 0 deletions nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ def _list_outputs(self):
return outputs


class ErrorInterface(SingleNodeTestInterface):
def _run_interface(self, runtime):
raise RuntimeError("This is an error")


def test_no_more_memory_than_specified(tmpdir):
tmpdir.chdir()
pipe = pe.Workflow(name="pipe")
Expand Down Expand Up @@ -157,3 +162,22 @@ def test_hold_job_until_procs_available(tmpdir):

max_threads = 2
pipe.run(plugin="MultiProc", plugin_args={"n_procs": max_threads})


@pytest.mark.parametrize("plugin", ["MultiProc", "LegacyMultiProc"])
def test_error_run_without_submitting(tmp_path, plugin):
wf = pe.Workflow(name='rws', base_dir=str(tmp_path))
n1 = pe.Node(SingleNodeTestInterface(), name='n1')
n1.inputs.input1 = 1
n2 = pe.Node(ErrorInterface(), name='n2', run_without_submitting=True)
n3 = pe.Node(SingleNodeTestInterface(), name='n3')

wf.connect(
[
(n1, n2, [('output1', 'input1')]),
(n2, n3, [('output1', 'input1')]),
]
),

with pytest.raises(RuntimeError):
wf.run(plugin=plugin)
Loading