Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sbachmei/mic 5753/embarrassingly parallel basic step #153

Merged

Conversation

stevebachmeier
Copy link
Collaborator

Embarrassingly parallel basic steps (milestone)

Description

Changes and notes

This implements the milestone as written, i.e. step 3 (already a LoopStep)
is not to loop an EmbarrassinglyParallelStep. It's important to note that this is
not a fully fleshed out feature:

  1. It doesn't handle multiple input slots
  2. It doesn't handle multiple output slots
  3. It doesn't handle multiple checkpoints.

These will be implemented in followup PRs.

Verification and Testing

@@ -87,6 +87,16 @@
default=False,
help="Do not save the results in a timestamped sub-directory of ``--output-dir``.",
),
click.option(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds -vvv and --pdb options to easylink generate-dag

@@ -45,6 +45,8 @@ class PipelineGraph(ImplementationGraph):
----------
config
The :class:`~easylink.configuration.Config` object.
freeze
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this for testing purposes.

from loguru import logger


def concatenate_datasets(input_files: list[str], output_filepath: str) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely to change, we just needed something to drop in for now.

from loguru import logger


def split_data_by_size(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely to change, we just needed something to drop in for now.

@@ -172,48 +174,91 @@ def test_update_slot_filepaths(default_config: Config, test_dir: str) -> None:
assert edge_attrs["filepaths"] == expected_filepaths[(source, sink)]


def test_get_input_slots(default_config: Config, test_dir: str) -> None:
def test_get_io_slots(default_config: Config, test_dir: str) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_input_slots became get_io_slots b/c we now need output slot data just like input slot data.

Copy link
Collaborator

@zmbc zmbc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As usual I only skimmed the tests. Left some detailed comments but overall this looks great and has turned out much nicer than I feared!

I left a few comments to this effect but I think we should aggressively fail when we hit situations we haven't implemented yet, to ensure things don't silently work brokenly.

run:
splitter_utils.{self.input_slots[input_slot_to_split]["splitter"].__name__}(
input_files=list(input.files),
output_dir=list(output)[0],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_dir=list(output)[0],
output_dir=output.output_dir,

Can you access these by name?

rule = self._define_aggregator_rule()
return aggregator + rule

def _define_aggregator_function(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this name confusing. I think it would be better to call this an input function and link to those Snakemake docs.

checkpoint_output = glob.glob(f"{{{checkpoint_name}.get(**wildcards).output.output_dir}}/*/")
else:
output, _ = {checkpoint_name}.rule.expand_output(wildcards)
raise IncompleteCheckpointException({checkpoint_name}.rule, checkpoint_target(output[0]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea where to put the comment (inside the string or outside) but we should have a comment pointing to the place in the Snakemake codebase that this was inspired by

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it to the Notes section of the AggregationRule docstring

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good addition, but I meant how we knew what to use for the second argument; I think the relevant link is https://github.com/snakemake/snakemake/blob/04f89d330dd94baa51f41bc796392f85bccbd231/snakemake/checkpoints.py#L42

Comment on lines 425 to 429
if os.path.exists(checkpoint_file):
checkpoint_output = glob.glob(f"{{{checkpoint_name}.get(**wildcards).output.output_dir}}/*/")
else:
output, _ = {checkpoint_name}.rule.expand_output(wildcards)
raise IncompleteCheckpointException({checkpoint_name}.rule, checkpoint_target(output[0]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could simplify the control flow a bit like this:

Suggested change
if os.path.exists(checkpoint_file):
checkpoint_output = glob.glob(f"{{{checkpoint_name}.get(**wildcards).output.output_dir}}/*/")
else:
output, _ = {checkpoint_name}.rule.expand_output(wildcards)
raise IncompleteCheckpointException({checkpoint_name}.rule, checkpoint_target(output[0]))
if not os.path.exists(checkpoint_file):
output, _ = {checkpoint_name}.rule.expand_output(wildcards)
raise IncompleteCheckpointException({checkpoint_name}.rule, checkpoint_target(output[0]))
checkpoint_output = glob.glob(f"{{{checkpoint_name}.get(**wildcards).output.output_dir}}/*/")

Copy link
Contributor

@albrja albrja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is pretty in the weeds but I tried to give you a review on at least some of it so that's why it is pretty nit picky about docstrings

@@ -45,6 +45,10 @@ def __init__(
implementation_config: LayeredConfigTree,
input_slots: Iterable["InputSlot"] = (),
output_slots: Iterable["OutputSlot"] = (),
is_embarrassingly_parallel: bool = False,
# wildcards: Sequence[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be commented out?

)
return input_slot_attrs, output_slot_attrs

def get_whether_embarrassingly_parallel(self, node: str) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This method doesn't really seem necessary but probably makes the formatting in the other method cleaner

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@albrja

get_whether_embarrassingly_parallel relies on a node_name arg and is used in pipeline.Pipeline._write_implementation_rules() to determine whether a given node is embarrassingly parallel or not.

any_embarrassingly_parallel is really more of a property (I'll make it one) that relies on get_whether_embarrassingly_parallel but is a property of the PipelineGraph, i.e. if any node is embarrassingly parallel.

I do agree that making this its own property is probably somewhat unnecessary indirection since it's only used once; I just kinda followed suit w/ the spark_is_required method (I'll change that to a property as well) right above these two for consistency.

@@ -56,7 +66,7 @@ class TargetRule(Rule):
requires_spark: bool
"""Whether or not this rule requires a Spark environment to run."""

def _build_rule(self) -> str:
def build_rule(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you unprivate this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that this, being a Rule's abstract method, is by definition intended to be called by other classes (the concrete instances of Rule). In other languages that enforce privacy, you literally couldn't call an abstract method private (b/c if private how could you then define it in the sub-class?).

Really doesn't matter much though. Just trying to get a handle on _s.


When running an :class:`~easylink.implementation.Implementation` in an embarrassingly
parallel way, we do not know until runtime how many parallel jobs there will
be (e.g. we don't know a priori how many chunks a large incoming dataset will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

priority?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a priori, i.e. "beforehand". I'll change it though, no reason to get fancy here


@dataclass
class AggregationRule(Rule):
"""A :class:`Rule` that aggregates the processed chunks of output data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful if there was some term specifically for the output of an embarrassingly parallel step. I feel like this sentence and the next one are slightly redundant that you are just specifying that it is the output of an embarrassingly parallel step.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose they're a little bit redundant, but I think the context that it's only really needed for EmbarrassinglyParallelSteps is pretty important (but didn't fit on the one-liner part of the docstring).

I do agree it might be better to call it something like ChunkAggregationRule or something like that, but that just seems a bit long in the tooth. I won't even consider EmbarrassinglyParallelProcessedChunkAggregationRule 😆

@stevebachmeier stevebachmeier marked this pull request as draft February 25, 2025 15:41
@stevebachmeier stevebachmeier marked this pull request as ready for review February 25, 2025 23:44
Copy link
Collaborator

@zmbc zmbc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a few minor comments but the updates look good overall 👍

# output_paths = ",".join(self.output)
# wildcards_subdir = "/".join([f"{{wildcards.{wc}}}" for wc in self.wildcards])
# and then in shell cmd: export DUMMY_CONTAINER_OUTPUT_PATHS={output_paths}/{wildcards_subdir}
# TODO [MIC-5877]: handle multiple wildcards, e.g.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really the right ticket? https://jira.ihme.washington.edu/browse/MIC-5877

If so, could use more explanation of why

Comment on lines 213 to 216
if len(self.output) > 1:
raise NotImplementedError(
"FIXME [MIC-5883] Multiple output files not yet supported"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need to be inside the if self.is_embarrassingly_parallel? If so, can we add a test that would fail due to this?

"""
aggregator = self._define_aggregator_function()
aggregator = self._define_input_function()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could change this variable name, e.g. to input_function_definition

# FIXME: handle multiple filepaths
def _define_input_function(self):
"""Builds the `input function <https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#input-functions>`_."""
if len(self.output_slot["filepaths"]) > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a slightly different thing; multiple files vs multiple slots?

Copy link
Collaborator Author

@stevebachmeier stevebachmeier Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only pass a single slot at a time into the AggregationRule.

checkpoint_output = glob.glob(f"{{{checkpoint_name}.get(**wildcards).output.output_dir}}/*/")
else:
output, _ = {checkpoint_name}.rule.expand_output(wildcards)
raise IncompleteCheckpointException({checkpoint_name}.rule, checkpoint_target(output[0]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good addition, but I meant how we knew what to use for the second argument; I think the relevant link is https://github.com/snakemake/snakemake/blob/04f89d330dd94baa51f41bc796392f85bccbd231/snakemake/checkpoints.py#L42

@stevebachmeier stevebachmeier merged commit 664a4a5 into main Feb 26, 2025
12 checks passed
@stevebachmeier stevebachmeier deleted the sbachmei/mic-5753/embarrassingly-parallel-basic-step branch February 26, 2025 17:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants