Skip to content

Commit

Permalink
Sbachmei/mic 5861/fix templated step (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier authored Feb 20, 2025
1 parent 1244c48 commit 1d36b3f
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 60 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
**0.1.4 - 2/18/25**
**0.1.5 - 2/20/25**

- Fix handling of templated steps when no looping or parallelism is requested

**0.1.4 - 2/20/25**

- Implement duplicate_template_step method on TemplatedStep class

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: easylink.pipeline_schema_constants.testing

This file was deleted.

2 changes: 1 addition & 1 deletion src/easylink/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Config(LayeredConfigTree):
def __init__(
self,
config_params: dict[str, Any],
potential_schemas: list[PipelineSchema] | PipelineSchema = PIPELINE_SCHEMAS,
potential_schemas: PipelineSchema | list[PipelineSchema] = PIPELINE_SCHEMAS,
) -> None:
super().__init__(layers=["initial_data", "default", "user_configured"])
self.update(DEFAULT_ENVIRONMENT, layer="default")
Expand Down
9 changes: 5 additions & 4 deletions src/easylink/pipeline_schema_constants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from easylink.pipeline_schema_constants import development, tests
from easylink.pipeline_schema_constants import development, testing

ALLOWED_SCHEMA_PARAMS = {
"development": development.SCHEMA_PARAMS,
}

TESTING_SCHEMA_PARAMS = {
"integration": tests.SINGLE_STEP_SCHEMA_PARAMS,
"combined_bad_topology": tests.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS,
"combined_bad_implementation_names": tests.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS,
"integration": testing.SINGLE_STEP_SCHEMA_PARAMS,
"combined_bad_topology": testing.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS,
"combined_bad_implementation_names": testing.BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS,
"nested_templated_steps": testing.NESTED_TEMPLATED_STEPS_SCHEMA_PARAMS,
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
OutputSlot,
OutputSlotMapping,
)
from easylink.step import HierarchicalStep, InputStep, LoopStep, OutputStep, Step
from easylink.step import (
HierarchicalStep,
InputStep,
LoopStep,
OutputStep,
ParallelStep,
Step,
)
from easylink.utilities.validation_utils import validate_input_file_dummy

SINGLE_STEP_NODES = [
Expand Down Expand Up @@ -128,3 +135,85 @@
]

BAD_COMBINED_TOPOLOGY_SCHEMA_PARAMS = (BAD_COMBINED_TOPOLOGY_NODES, SINGLE_STEP_EDGES)


NESTED_TEMPLATED_STEPS_NODES = [
InputStep(),
LoopStep(
template_step=ParallelStep(
template_step=HierarchicalStep(
step_name="step_1",
input_slots=[
InputSlot(
name="step_1_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
),
],
output_slots=[OutputSlot("step_1_main_output")],
nodes=[
Step(
step_name="step_1a",
input_slots=[
InputSlot(
name="step_1a_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
),
],
output_slots=[OutputSlot("step_1a_main_output")],
),
Step(
step_name="step_1b",
input_slots=[
InputSlot(
name="step_1b_main_input",
env_var="DUMMY_CONTAINER_MAIN_INPUT_FILE_PATHS",
validator=validate_input_file_dummy,
),
],
output_slots=[OutputSlot("step_1b_main_output")],
),
],
edges=[
EdgeParams(
source_node="step_1a",
target_node="step_1b",
output_slot="step_1a_main_output",
input_slot="step_1b_main_input",
),
],
input_slot_mappings=[
InputSlotMapping(
parent_slot="step_1_main_input",
child_node="step_1a",
child_slot="step_1a_main_input",
),
],
output_slot_mappings=[
OutputSlotMapping(
parent_slot="step_1_main_output",
child_node="step_1b",
child_slot="step_1b_main_output",
),
],
),
),
self_edges=[
EdgeParams(
source_node="step_1",
target_node="step_1",
output_slot="step_1_main_output",
input_slot="step_1_main_input",
),
],
),
OutputStep(
input_slots=[
InputSlot(name="result", env_var=None, validator=validate_input_file_dummy)
],
),
]


NESTED_TEMPLATED_STEPS_SCHEMA_PARAMS = (NESTED_TEMPLATED_STEPS_NODES, SINGLE_STEP_EDGES)
85 changes: 64 additions & 21 deletions src/easylink/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ def set_configuration_state(
) -> None:
"""Sets the configuration state for this ``Step``.
The so-called 'configuration state' for a given ``Step`` is backed up by
a :class:`ConfigurationState` class and is assigned to its :attr:`_configuration_state`
attribute. There are two possible ``ConfigurationStates``:
:class:`LeafConfigurationState` and :class:`NonLeafConfigurationState`.
This method sets the configuration state of this ``Step`` based on whether
or not a :attr:`config_key` is set *and exists is the ``Step's`` configuration*
(i.e. its portion of the user-suppled pipeline specification
file); any required deviation from this behavior requires special
handling.
Parameters
----------
parent_config
Expand Down Expand Up @@ -378,8 +389,9 @@ def _validate_nonleaf(
) -> dict[str, list[str]]:
"""Validates a non-leaf ``Step``."""
errors = {}
for node in self.step_graph.nodes:
step = self.step_graph.nodes[node]["step"]
nodes = self.step_graph.nodes
for node in nodes:
step = nodes[node]["step"]
if isinstance(step, IOStep):
continue
if step.name not in step_config:
Expand All @@ -390,7 +402,7 @@ def _validate_nonleaf(
)
if step_errors:
errors.update(step_errors)
extra_steps = set(step_config.keys()) - set(self.step_graph.nodes)
extra_steps = set(step_config.keys()) - set(nodes)
for extra_step in extra_steps:
errors[f"step {extra_step}"] = [f"{extra_step} is not a valid step."]
return errors
Expand Down Expand Up @@ -807,12 +819,43 @@ def set_configuration_state(
The configuration for any implementations to be combined.
input_data_config
The input data configuration for the entire pipeline.
Notes
-----
A ``TemplatedStep`` is always assigned a :class:`NonLeafConfigurationState`
even if it has no multiplicity since (despite having no copies to make) we
still need to traverse the sub-``Steps`` to get to the one with a single
:class:`~easylink.implementation.Implementation`, i.e. the one with a
:class:`LeafConfigurationState`.
"""
num_repeats = len(self._get_config(parent_config[self.name]))
self.step_graph = self._update_step_graph(num_repeats)
self.slot_mappings = self._update_slot_mappings(num_repeats)
super().set_configuration_state(
parent_config, combined_implementations, input_data_config
step_config = parent_config[self.name]
if self.config_key not in step_config:
# Special handle the step_graph update
self.step_graph = StepGraph()
self.template_step.name = self.name
self.step_graph.add_node_from_step(self.template_step)
# Special handle the slot_mappings update
input_mappings = [
InputSlotMapping(slot, self.name, slot) for slot in self.input_slots
]
output_mappings = [
OutputSlotMapping(slot, self.name, slot) for slot in self.output_slots
]
self.slot_mappings = {"input": input_mappings, "output": output_mappings}
# Add the key back to the expanded config
expanded_config = LayeredConfigTree({self.name: step_config})
else:
expanded_config = self._get_config(step_config)
num_repeats = len(expanded_config)
self.step_graph = self._update_step_graph(num_repeats)
self.slot_mappings = self._update_slot_mappings(num_repeats)
# Manually set the configuration state to non-leaf instead of relying
# on super().get_configuration_state() because that method will erroneously
# set to leaf state when we have no multiplicity (because in that case the
# user didn't actually include the config_key in the pipeline specification
# file, hence num_repeats == 1)
self._configuration_state = NonLeafConfigurationState(
self, expanded_config, combined_implementations, input_data_config
)

def _duplicate_template_step(self) -> Step:
Expand Down Expand Up @@ -1105,9 +1148,10 @@ def validate_step(
initial ones are handled.
We update the :class:`easylink.graph_components.StepGraph` and ``SlotMappings``
here as opposed to in :meth:`set_configuration_state` (as is done in :class:`TemplatedStep`)
because ``ChoiceStep`` validation happens prior to setting the configuration
state and actually requires the ``StepGraph`` and ``SlotMappings``.
in :meth:`validate_step` (as opposed to in :meth:`set_configuration_state`
as is done in :class:`TemplatedStep`) because :meth:`validate_step` is called
prior to :meth:`set_configuration_state`, but the validations itself actually
requires the updated ``StepGraph`` and ``SlotMappings``.
We do not attempt to validate the subgraph here if the 'type' key is unable
to be validated.
Expand Down Expand Up @@ -1136,7 +1180,7 @@ def validate_step(
]
}

# Handle the actual chosen step_config
# HACK: Update the step graph and mappings here because we need them for validation
self.step_graph = self._update_step_graph(subgraph)
self.slot_mappings = self._update_slot_mappings(subgraph)
# NOTE: A ChoiceStep is by definition non-leaf step
Expand All @@ -1163,11 +1207,11 @@ def set_configuration_state(
Notes
-----
We update the :class:`~easylink.graph_components.StepGraph` and
:class:`SlotMappings<easylink.graph_components.SlotMapping>` in
:meth:`validate_step` as opposed to here (as is done with
:class:`TemplatedSteps<TemplatedStep>`) because ``ChoiceStep`` validation
happens prior to this but requires the ``StepGraph`` and ``SlotMappings``.
We update the :class:`easylink.graph_components.StepGraph` and ``SlotMappings``
in :meth:`validate_step` (as opposed to in :meth:`set_configuration_state`
as is done in :class:`TemplatedStep`) because :meth:`validate_step` is called
prior to :meth:`set_configuration_state`, but the validations itself actually
requires the updated ``StepGraph`` and ``SlotMappings``.
"""

chosen_parent_config = LayeredConfigTree(
Expand Down Expand Up @@ -1364,7 +1408,6 @@ def get_implementation_edges(self, edge: EdgeParams) -> list[EdgeParams]:
for mapping in mappings:
imp_edge = mapping.remap_edge(edge)
implementation_edges.append(imp_edge)

elif edge.target_node == self._step.name:
mappings = [
mapping
Expand Down Expand Up @@ -1520,7 +1563,6 @@ def get_implementation_edges(self, edge: EdgeParams) -> list[EdgeParams]:
new_step = self._step.step_graph.nodes[mapping.child_node]["step"]
imp_edges = new_step.get_implementation_edges(new_edge)
implementation_edges.extend(imp_edges)

elif edge.target_node == self._step.name:
mappings = [
mapping
Expand All @@ -1544,8 +1586,9 @@ def _configure_subgraph_steps(self) -> None:
This method recursively traverses the ``StepGraph`` and sets the configuration
state for each ``Step`` until reaching all leaf nodes.
"""
for node in self._step.step_graph.nodes:
step = self._step.step_graph.nodes[node]["step"]
nodes = self._step.step_graph.nodes
for node in nodes:
step = nodes[node]["step"]
step.set_configuration_state(
self.pipeline_config, self.combined_implementations, self.input_data_config
)
36 changes: 36 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,42 @@
},
},
},
"nested_templated_steps": {
"step_1": {
"iterate": [
{ # loop 1: parallel with multiplicity
"parallel": [
{
"implementation": {"name": "step_1_python_pandas"},
"input_data_file": "file1",
},
{
"implementation": {"name": "step_1_python_pandas"},
"input_data_file": "file2",
},
],
},
{ # loop 2: parallel with no multiplicity
"parallel": [
{
"input_data_file": "file1",
"implementation": {"name": "step_1_python_pandas"},
},
],
},
{ # loop 3: missing 'parallel' key, uses hierarchical step
"substeps": {
"step_1a": {
"implementation": {"name": "step_1a_python_pandas"},
},
"step_1b": {
"implementation": {"name": "step_1b_python_pandas"},
},
},
},
],
},
},
}

INPUT_DATA_FORMAT_DICT = {
Expand Down
Loading

0 comments on commit 1d36b3f

Please sign in to comment.