Skip to content

Commit 54bbd47

Browse files
committed
fixed all unittests outside of test_workflow that were broken in recent changes
1 parent 128c224 commit 54bbd47

10 files changed

+139
-53
lines changed

pydra/engine/environments.py

+36-18
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
from copy import copy
44
from .helpers import execute
55
from pathlib import Path
6+
import logging
67
from fileformats.generic import FileSet
78
from pydra.engine.helpers import list_fields
89
from pydra.utils.typing import TypeParser
910

11+
logger = logging.getLogger("pydra")
1012

1113
if ty.TYPE_CHECKING:
1214
from pydra.engine.core import Task
@@ -121,34 +123,50 @@ def get_bindings(
121123
fld: shell.arg
122124
for fld in list_fields(task.definition):
123125
if TypeParser.contains_type(FileSet, fld.type):
124-
fileset: FileSet | None = task.inputs[fld.name]
125-
if not fileset:
126+
value: FileSet | None = task.inputs[fld.name]
127+
if not value:
126128
continue
127-
if not isinstance(fileset, (os.PathLike, FileSet)):
128-
raise NotImplementedError(
129-
f"No support for generating bindings for {type(fileset)} types "
130-
f"({fileset})"
131-
)
129+
132130
copy_file = fld.copy_mode == FileSet.CopyMode.copy
133131

134-
host_path, env_path = fileset.parent, Path(f"{root}{fileset.parent}")
132+
def map_path(fileset: os.PathLike | FileSet) -> Path:
133+
host_path, env_path = fileset.parent, Path(
134+
f"{root}{fileset.parent}"
135+
)
135136

136-
# Default to mounting paths as read-only, but respect existing modes
137-
bindings[host_path] = (
138-
env_path,
139-
"rw" if copy_file or isinstance(fld, shell.outarg) else "ro",
140-
)
137+
# Default to mounting paths as read-only, but respect existing modes
138+
bindings[host_path] = (
139+
env_path,
140+
"rw" if copy_file or isinstance(fld, shell.outarg) else "ro",
141+
)
142+
return (
143+
env_path / fileset.name
144+
if isinstance(fileset, os.PathLike)
145+
else tuple(env_path / rel for rel in fileset.relative_fspaths)
146+
)
141147

142148
# Provide updated in-container paths to the command to be run. If a
143149
# fs-object, which resolves to a single path, just pass in the name of
144150
# that path relative to the location in the mount point in the container.
145151
# If it is a more complex file-set with multiple paths, then it is converted
146152
# into a tuple of paths relative to the base of the fileset.
147-
value_updates[fld.name] = (
148-
env_path / fileset.name
149-
if isinstance(fileset, os.PathLike)
150-
else tuple(env_path / rel for rel in fileset.relative_fspaths)
151-
)
153+
if TypeParser.matches(value, os.PathLike | FileSet):
154+
value_updates[fld.name] = map_path(value)
155+
elif TypeParser.matches(value, ty.Sequence[FileSet | os.PathLike]):
156+
mapped_value = []
157+
for val in value:
158+
mapped_val = map_path(val)
159+
if isinstance(mapped_val, tuple):
160+
mapped_value.extend(mapped_val)
161+
else:
162+
mapped_value.append(mapped_val)
163+
value_updates[fld.name] = mapped_value
164+
else:
165+
logger.debug(
166+
"No support for generating bindings for %s types " "(%s)",
167+
type(value),
168+
value,
169+
)
152170

153171
# Add the cache directory to the list of mounts
154172
bindings[task.cache_dir] = (f"{self.root}/{task.cache_dir}", "rw")

pydra/engine/helpers.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,12 @@ def load_and_run(task_pkl: Path, rerun: bool = False) -> Path:
536536

537537
resultfile = task.output_dir / "_result.pklz"
538538
try:
539-
task.run(rerun=rerun)
539+
if task.submitter.worker.is_async:
540+
task.submitter.loop.run_until_complete(
541+
task.submitter.worker.run_async(task, rerun=rerun)
542+
)
543+
else:
544+
task.submitter.worker.run(task, rerun=rerun)
540545
except Exception as e:
541546
# creating result and error files if missing
542547
errorfile = task.output_dir / "_error.pklz"

pydra/engine/submitter.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ def _split_definition(self) -> dict[int, "TaskDef[OutputType]"]:
725725
resolved[inpt_name] = value._get_value(
726726
workflow=self.workflow,
727727
graph=self.graph,
728-
state_index=input_ind[state_key],
728+
state_index=input_ind.get(state_key),
729729
)
730730
elif state_key in input_ind:
731731
resolved[inpt_name] = self.node.state._get_element(
@@ -786,7 +786,7 @@ def get_runnable_tasks(self, graph: DiGraph) -> list["Task[DefType]"]:
786786
if is_runnable:
787787
runnable.append(self.blocked.pop(index))
788788
self.queued.update({t.state_index: t for t in runnable})
789-
return runnable
789+
return list(self.queued.values())
790790

791791

792792
async def prepare_runnable(runnable):

pydra/engine/tests/test_helpers.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ def test_load_and_run_exception_run(tmpdir):
230230
assert result_1.outputs.out == 2
231231

232232

233-
def test_load_and_run_wf(tmpdir):
233+
@pytest.mark.parametrize("worker", ["cf", "debug"])
234+
def test_load_and_run_wf(tmpdir, worker):
234235
"""testing load_and_run for pickled task"""
235236
wf_pkl = Path(tmpdir.join("wf_main.pkl"))
236237

@@ -242,7 +243,7 @@ def Workflow(x, y=10):
242243
task = Task(
243244
name="mult",
244245
definition=Workflow(x=2),
245-
submitter=Submitter(cache_dir=tmpdir, worker="cf"),
246+
submitter=Submitter(cache_dir=tmpdir, worker=worker),
246247
)
247248

248249
with wf_pkl.open("wb") as fp:

pydra/engine/tests/test_singularity.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def test_singularity_2a(plugin, tmp_path):
8989
assert singu.cmdline == f"{cmd_exec} {' '.join(cmd_args)}"
9090

9191
with Submitter(
92-
worker=plugin, environment=Singularity(image=image), cache_dir=tmp_path
92+
worker="debug", environment=Singularity(image=image), cache_dir=tmp_path
9393
) as sub:
9494
res = sub(singu)
9595

pydra/engine/tests/test_specs.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ def test_input_file_hash_1(tmp_path):
9797
def A(in_file: File) -> File:
9898
return in_file
9999

100-
assert A(in_file=outfile)._hash == "9644d3998748b339819c23ec6abec520"
100+
assert A(in_file=outfile)._hash == "e708da65b720212c5ce9ed2c65aae59c"
101101

102102
with open(outfile, "w") as fp:
103103
fp.write("test")
104104

105-
assert A(in_file=outfile)._hash == "9f7f9377ddef6d8c018f1bf8e89c208c"
105+
assert A(in_file=outfile)._hash == "f726a193430352bb3b92dccf5eccff3a"
106106

107107

108108
def test_input_file_hash_2(tmp_path):
@@ -117,7 +117,7 @@ def A(in_file: File) -> File:
117117

118118
# checking specific hash value
119119
hash1 = A(in_file=file)._hash
120-
assert hash1 == "179bd3cbdc747edc4957579376fe8c7d"
120+
assert hash1 == "eba2fafb8df4bae94a7aa42bb159b778"
121121

122122
# checking if different name doesn't affect the hash
123123
file_diffname = tmp_path / "in_file_2.txt"
@@ -146,7 +146,7 @@ def A(in_file: ty.Union[File, int]) -> File:
146146

147147
# checking specific hash value
148148
hash1 = A(in_file=file)._hash
149-
assert hash1 == "179bd3cbdc747edc4957579376fe8c7d"
149+
assert hash1 == "eba2fafb8df4bae94a7aa42bb159b778"
150150

151151
# checking if different name doesn't affect the hash
152152
file_diffname = tmp_path / "in_file_2.txt"
@@ -234,7 +234,7 @@ def A(in_file: ty.List[ty.List[ty.Union[int, File]]]) -> File:
234234

235235
# checking specific hash value
236236
hash1 = A(in_file=[[file, 3]])._hash
237-
assert hash1 == "ffd7afe0ca9d4585518809a509244b4b"
237+
assert hash1 == "b583e0fd5501d3bed9bf510ce2a9e379"
238238

239239
# the same file, but int field changes
240240
hash1a = A(in_file=[[file, 5]])._hash
@@ -268,7 +268,7 @@ def A(in_file: ty.List[ty.Dict[ty.Any, ty.Union[File, int]]]) -> File:
268268

269269
# checking specific hash value
270270
hash1 = A(in_file=[{"file": file, "int": 3}])._hash
271-
assert hash1 == "ba884a74e33552854271f55b03e53947"
271+
assert hash1 == "aa2d4b708ed0dd8340582a6514bfd5ce"
272272

273273
# the same file, but int field changes
274274
hash1a = A(in_file=[{"file": file, "int": 5}])._hash

0 commit comments

Comments
 (0)