Skip to content
4 changes: 1 addition & 3 deletions nipype/interfaces/base/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
runtime.stderr = None
runtime.cmdline = self.cmdline
runtime.environ.update(out_environ)
runtime.success_codes = correct_return_codes

# which $cmd
executable_name = shlex.split(self._cmd_prefix + self.cmd)[0]
Expand All @@ -742,9 +743,6 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
else "<skipped>"
)
runtime = run_command(runtime, output=self.terminal_output)
if runtime.returncode is None or runtime.returncode not in correct_return_codes:
self.raise_exception(runtime)

return runtime

def _format_arg(self, name, trait_spec, value):
Expand Down
7 changes: 7 additions & 0 deletions nipype/interfaces/base/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ def __exit__(self, exc_type, exc_value, exc_tb):
if self._ignore_exc:
return True

if hasattr(self._runtime, "cmdline"):
retcode = self._runtime.returncode
if retcode not in self._runtime.success_codes:
self._runtime.traceback = (
f"RuntimeError: subprocess exited with code {retcode}."
)

@property
def runtime(self):
return self._runtime
Expand Down
24 changes: 5 additions & 19 deletions nipype/interfaces/matlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,11 @@


def get_matlab_command():
if "NIPYPE_NO_MATLAB" in os.environ:
return None

try:
matlab_cmd = os.environ["MATLABCMD"]
except:
matlab_cmd = "matlab"

try:
res = CommandLine(
command="which",
args=matlab_cmd,
resource_monitor=False,
terminal_output="allatonce",
).run()
matlab_path = res.runtime.stdout.strip()
except Exception:
return None
return matlab_cmd
"""Determine whether Matlab is installed and can be executed."""
if "NIPYPE_NO_MATLAB" not in os.environ:
from nipype.utils.filemanip import which

return which(os.getenv("MATLABCMD", "matlab"))


no_matlab = get_matlab_command() is None
Expand Down
2 changes: 1 addition & 1 deletion nipype/interfaces/tests/test_matlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_run_interface(tmpdir):
# bypasses ubuntu dash issue
mc = mlab.MatlabCommand(script="foo;", paths=[tmpdir.strpath], mfile=True)
assert not os.path.exists(default_script_file), "scriptfile should not exist 4."
with pytest.raises(RuntimeError):
with pytest.raises(OSError):
mc.run()
assert os.path.exists(default_script_file), "scriptfile should exist 4."
if os.path.exists(default_script_file): # cleanup
Expand Down
2 changes: 1 addition & 1 deletion nipype/interfaces/utility/tests/test_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def should_fail(tmp):


def test_should_fail(tmpdir):
with pytest.raises(NameError):
with pytest.raises(pe.nodes.NodeExecutionError):
should_fail(tmpdir)


Expand Down
107 changes: 44 additions & 63 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import os
import os.path as op
from pathlib import Path
import shutil
import socket
from copy import deepcopy
Expand All @@ -30,7 +31,6 @@
load_json,
emptydirs,
savepkl,
indirectory,
silentrm,
)

Expand Down Expand Up @@ -64,6 +64,10 @@
logger = logging.getLogger("nipype.workflow")


class NodeExecutionError(RuntimeError):
"""A nipype-specific name for exceptions when executing a Node."""


class Node(EngineBase):
"""
Wraps interface objects for use in pipeline
Expand Down Expand Up @@ -98,7 +102,7 @@ def __init__(
run_without_submitting=False,
n_procs=None,
mem_gb=0.20,
**kwargs
**kwargs,
):
"""
Parameters
Expand Down Expand Up @@ -439,7 +443,8 @@ def run(self, updatehash=False):
)

# Check hash, check whether run should be enforced
logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir)
if not isinstance(self, MapNode):
logger.info(f'[Node] Setting-up "{self.fullname}" in "{outdir}".')
cached, updated = self.is_cached()

# If the node is cached, check on pklz files and finish
Expand Down Expand Up @@ -530,7 +535,6 @@ def run(self, updatehash=False):
# Tear-up after success
shutil.move(hashfile_unfinished, hashfile_unfinished.replace("_unfinished", ""))
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
logger.info('[Node] Finished "%s".', self.fullname)
return result

def _get_hashval(self):
Expand Down Expand Up @@ -582,7 +586,7 @@ def _get_inputs(self):
logger.critical("%s", e)

if outputs is None:
raise RuntimeError(
raise NodeExecutionError(
"""\
Error populating the inputs of node "%s": the results file of the source node \
(%s) does not contain any outputs."""
Expand Down Expand Up @@ -697,79 +701,56 @@ def _run_command(self, execute, copyfiles=True):
)
return result

outdir = self.output_dir()
# Run command: either execute is true or load_results failed.
result = InterfaceResult(
interface=self._interface.__class__,
runtime=Bunch(
cwd=outdir,
returncode=1,
environ=dict(os.environ),
hostname=socket.gethostname(),
),
inputs=self._interface.inputs.get_traitsfree(),
)

outdir = Path(self.output_dir())
if copyfiles:
self._originputs = deepcopy(self._interface.inputs)
self._copyfiles_to_wd(execute=execute)

message = '[Node] Running "{}" ("{}.{}")'.format(
self.name, self._interface.__module__, self._interface.__class__.__name__
# Run command: either execute is true or load_results failed.
logger.info(
f'[Node] Executing "{self.name}" <{self._interface.__module__}'
f".{self._interface.__class__.__name__}>"
)
# Invoke core run method of the interface ignoring exceptions
result = self._interface.run(cwd=outdir, ignore_exception=True)
logger.info(
f'[Node] Finished "{self.name}", elapsed time {result.runtime.duration}s.'
)

if issubclass(self._interface.__class__, CommandLine):
try:
with indirectory(outdir):
cmd = self._interface.cmdline
except Exception as msg:
result.runtime.stderr = "{}\n\n{}".format(
getattr(result.runtime, "stderr", ""), msg
)
_save_resultfile(
result,
outdir,
self.name,
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
)
raise
cmdfile = op.join(outdir, "command.txt")
with open(cmdfile, "wt") as fd:
print(cmd + "\n", file=fd)
message += ", a CommandLine Interface with command:\n{}".format(cmd)
logger.info(message)
try:
result = self._interface.run(cwd=outdir)
except Exception as msg:
result.runtime.stderr = "%s\n\n%s".format(
getattr(result.runtime, "stderr", ""), msg
)
_save_resultfile(
result,
# Write out command line as it happened
Path.write_text(outdir / "command.txt", f"{result.runtime.cmdline}\n")

exc_tb = getattr(result.runtime, "traceback", None)

if not exc_tb:
# Clean working directory if no errors
dirs2keep = None
if isinstance(self, MapNode):
dirs2keep = [op.join(outdir, "mapflow")]

result.outputs = clean_working_directory(
result.outputs,
outdir,
self.name,
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
self._interface.inputs,
self.needed_outputs,
self.config,
dirs2keep=dirs2keep,
)
raise

dirs2keep = None
if isinstance(self, MapNode):
dirs2keep = [op.join(outdir, "mapflow")]

result.outputs = clean_working_directory(
result.outputs,
outdir,
self._interface.inputs,
self.needed_outputs,
self.config,
dirs2keep=dirs2keep,
)
# Store results file under all circumstances
_save_resultfile(
result,
outdir,
self.name,
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
)

if exc_tb:
raise NodeExecutionError(
f"Exception raised while executing Node {self.name}.\n\n{result.runtime.traceback}"
)

return result

def _copyfiles_to_wd(self, execute=True, linksonly=False):
Expand Down Expand Up @@ -1290,7 +1271,7 @@ def _collate_results(self, nodes):
if code is not None:
msg += ["Subnode %d failed" % i]
msg += ["Error: %s" % str(code)]
raise Exception(
raise NodeExecutionError(
"Subnodes of node: %s failed:\n%s" % (self.name, "\n".join(msg))
)

Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/engine/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def test_mapnode_crash(tmpdir):
node.config = deepcopy(config._sections)
node.config["execution"]["stop_on_first_crash"] = True
node.base_dir = tmpdir.strpath
with pytest.raises(TypeError):
with pytest.raises(pe.nodes.NodeExecutionError):
node.run()
os.chdir(cwd)

Expand Down
21 changes: 19 additions & 2 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def run(self, graph, config, updatehash=False):
self.mapnodesubids = {}
# setup polling - TODO: change to threaded model
notrun = []
errors = []

old_progress_stats = None
old_presub_stats = None
Expand All @@ -146,7 +147,7 @@ def run(self, graph, config, updatehash=False):
"Progress: %d jobs, %d/%d/%d "
"(done/running/ready), %d/%d "
"(pending_tasks/waiting).",
*progress_stats
*progress_stats,
)
old_progress_stats = progress_stats
toappend = []
Expand All @@ -155,14 +156,16 @@ def run(self, graph, config, updatehash=False):
taskid, jobid = self.pending_tasks.pop()
try:
result = self._get_result(taskid)
except Exception:
except Exception as exc:
notrun.append(self._clean_queue(jobid, graph))
errors.append(exc)
else:
if result:
if result["traceback"]:
notrun.append(
self._clean_queue(jobid, graph, result=result)
)
errors.append("".join(result["traceback"]))
else:
self._task_finished_cb(jobid)
self._remove_node_dirs()
Expand Down Expand Up @@ -194,6 +197,20 @@ def run(self, graph, config, updatehash=False):
# close any open resources
self._postrun_check()

if errors:
# If one or more nodes failed, re-rise first of them
error, cause = errors[0], None
if isinstance(error, str):
error = RuntimeError(error)

if len(errors) > 1:
error, cause = (
RuntimeError(f"{len(errors)} raised. Re-raising first."),
error,
)

raise error from cause

def _get_result(self, taskid):
raise NotImplementedError

Expand Down
27 changes: 20 additions & 7 deletions nipype/pipeline/plugins/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def run(self, graph, config, updatehash=False):
old_wd = os.getcwd()
notrun = []
donotrun = []
stop_on_first_crash = str2bool(config["execution"]["stop_on_first_crash"])
errors = []
nodes, _ = topological_sort(graph)
for node in nodes:
endstatus = "end"
Expand All @@ -43,27 +45,38 @@ def run(self, graph, config, updatehash=False):
if self._status_callback:
self._status_callback(node, "start")
node.run(updatehash=updatehash)
except:
except Exception as exc:
endstatus = "exception"
# bare except, but i really don't know where a
# node might fail
crashfile = report_crash(node)
if str2bool(config["execution"]["stop_on_first_crash"]):
raise
# remove dependencies from queue
subnodes = [s for s in dfs_preorder(graph, node)]
notrun.append(
{"node": node, "dependents": subnodes, "crashfile": crashfile}
)
donotrun.extend(subnodes)
# Delay raising the crash until we cleaned the house
if str2bool(config["execution"]["stop_on_first_crash"]):
os.chdir(old_wd) # Return wherever we were before
report_nodes_not_run(notrun) # report before raising
raise
errors.append(exc)

if stop_on_first_crash:
break
finally:
if self._status_callback:
self._status_callback(node, endstatus)

os.chdir(old_wd) # Return wherever we were before
report_nodes_not_run(notrun)
if errors:
# If one or more nodes failed, re-rise first of them
error, cause = errors[0], None
if isinstance(error, str):
error = RuntimeError(error)

if len(errors) > 1:
error, cause = (
RuntimeError(f"{len(errors)} raised. Re-raising first."),
error,
)

raise error from cause
7 changes: 4 additions & 3 deletions nipype/pipeline/plugins/tests/test_sgelike.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def test_crashfile_creation(tmp_path):
sgelike_plugin = SGELikeBatchManagerBase("")
with pytest.raises(RuntimeError) as e:
assert pipe.run(plugin=sgelike_plugin)
assert str(e.value) == "Workflow did not execute cleanly. Check log for details"

crashfiles = tmp_path.glob("crash*crasher*.pklz")
assert len(list(crashfiles)) == 1
crashfiles = list(tmp_path.glob("crash*crasher*.pklz")) + list(
tmp_path.glob("crash*crasher*.txt")
)
assert len(crashfiles) == 1
Loading