Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[doc] wip: update docstrings for core.py #317

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 116 additions & 35 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,60 @@

class TaskBase:
"""
A base structure for the nodes in the processing graph.
The class that acts as a base structure for all tasks in a graph.

Tasks are a generic compute step from which both elemntary tasks and
A task is a generic compute step from which both :class:`FunctionTask` and
:class:`Workflow` instances inherit.

TODO: Which functions ares in Tasbase that are not overwritten in subclasses
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might depends on the specific class. Can't be generalized 


Attributes
----------
name : str
Unique name of this node.
inputs : : TODO
XXXX
input_names : :obj:`List` of :obj:`str`
Name(s) of input(s) to this node.
input_spec : : TODO
XXXX


cache_dir : :obj:`os.pathlike` or None
Path to directory to store output. Save new cache here if prior cache couldn't be
found here or in `cache_locations`.
cache_locations : obj:`os.pathlike` or :obj:`list` of :obj:`os.pathlike` or None
Path or list of paths to search for cached results. Reuse cache if found.
allow_cache_override : :obj:`bool`
TODO

rerun : :obj:`bool`
Whether results should be checked (does not propagate to nodes)

audit : :class:`Audit`
Configure provenance tracking. Default is no provenance tracking.

TODO: Finish this
"""

_api_version: str = "0.0.1" # Should generally not be touched by subclasses
_etelemetry_version_data = None # class variable to store etelemetry information
_version: str # Version of tool being wrapped
_task_version: ty.Optional[str] = None
# Task writers encouraged to define and increment when implementation changes sufficiently
_input_sets = None # Dictionaries of predefined input settings

audit_flags: AuditFlag = AuditFlag.NONE
"""What to audit -- available flags: :class:`~pydra.utils.messenger.AuditFlag`."""

_can_resume = False # Does the task allow resuming from previous state
_redirect_x = False # Whether an X session should be created/directed

_runtime_requirements = RuntimeSpec()
_runtime_hints = None

# Task writers encouraged to define and increment when implementation changes sufficiently
_cache_dir = None # Working directory in which to operate
_references = None # List of references for a task
_input_sets = None # Dictionaries of predefined input settings

audit_flags: AuditFlag = AuditFlag.NONE
"""What to audit -- available flags: :class:`~pydra.utils.messenger.AuditFlag`."""

def __init__(
self,
Expand All @@ -82,9 +111,9 @@ def __init__(
cache_dir=None,
cache_locations=None,
inputs: ty.Optional[ty.Union[ty.Text, File, ty.Dict]] = None,
messenger_args=None,
messengers=None,
rerun=False,
messengers=None,
messenger_args=None,
):
"""
Initialize a task.
Expand All @@ -105,32 +134,42 @@ def __init__(
Parameters
----------
name : :obj:`str`
Unique name of this node
Unique name of this node.
audit_flags : :class:`AuditFlag`, optional
Configure provenance tracking. Default is no provenance tracking.
See available flags at :class:`~pydra.utils.messenger.AuditFlag`.
cache_dir : :obj:`os.pathlike`
Set a custom directory of previously computed nodes.
cache_locations :
TODO
cache_dir : :obj:`os.pathlike` or None
Path to directory to store output. Save new cache here if prior cache couldn't be
found here or in `cache_locations`.
cache_locations : obj:`os.pathlike` or :obj:`list` of :obj:`os.pathlike` or None
Path or list of paths to search for cached results. Reuse cache if found.
inputs : :obj:`typing.Text`, or :class:`File`, or :obj:`dict`, or `None`.
Set particular inputs to this node.
messenger_args :
TODO
messengers :
TODO
Name(s) of input(s) to this node.
File path(s) are given if inputs are files.
Use in attributes `inputs` and `input_names`.
rerun : :obj:`bool`
Whether results should be checked (does not propagate to nodes)

Other Parameters
----------
messenger : :class:`Messenger` or :obj:`list` of :class:`Messenger` or None
Messenger(s) used by Audit. Saved in the `audit` attribute.
See available flags at :class:`~pydra.utils.messenger.Messenger`.
messengers_args : TODO what type?
Argument(s) used by `messegner`. Saved in the `audit` attribute.

"""
from .. import check_latest_version

if TaskBase._etelemetry_version_data is None:
TaskBase._etelemetry_version_data = check_latest_version()

# raise error if name is same as of attributes
if name in dir(self):
if name in dir(self): # raise error if name is same as of attributes
raise ValueError("Cannot use names of attributes or methods as task name")
self.name = name
if not self.input_spec:

# set inputs: TODO should we type check in spec?
if not self.input_spec: # `input_spec` saves information in inputs/outputs
raise Exception("No input_spec in class: %s" % self.__class__.__name__)
klass = make_klass(self.input_spec)

Expand Down Expand Up @@ -175,6 +214,23 @@ def __init__(
if self._input_sets is None:
self._input_sets = {}

self.cache_dir = cache_dir
self.cache_locations = cache_locations
self.allow_cache_override = True

self.inp_lf = {} # dict to save the connections with lazy fields
self._output = {}
self._result = {}

self.plugin = None
self.state = None
self.task_rerun = rerun

self._checksum = None
self._done = False # if node finished all jobs
self._errored = False

# messenger and hooks
self.audit = Audit(
audit_flags=audit_flags,
messengers=messengers,
Expand All @@ -191,7 +247,6 @@ def __init__(

self.plugin = None
self.hooks = TaskHook()
self._errored = False

def __str__(self):
return self.name
Expand Down Expand Up @@ -237,10 +292,21 @@ def errored(self):

@property
def checksum(self):
""" Calculates the unique checksum of the task.
Used to create specific directory name for task that are run;
and to create nodes checksums needed for graph checkums
(before the tasks have inputs etc.)
"""
Calculates the unique checksum of the task.

* Used to create name of cache directory and to create graph checksums.
* If task has a splitter, the hash of the splitter is used in checksum calculation
* Hash from self.inputs is used to calculate the task checksum, but the value
of inputs are not required for the calculation itself (inputs can be a `LazyField`
object which will be calculated later, or empty if the task doesn't
require any inputs).

Returns
----------
checksum
checksum name TODO

"""
input_hash = self.inputs.hash
if self.state is None:
Expand All @@ -254,7 +320,7 @@ def checksum(self):

def checksum_states(self, state_index=None):
"""
Calculate a checksum for the specific state or all of the states of the task.
Calculates a checksum for the specific state or all of the states of the task.
Replaces lists in the inputs fields with a specific values for states.
Used to recreate names of the task directories,

Expand Down Expand Up @@ -313,7 +379,9 @@ def uid(self):

def set_state(self, splitter, combiner=None):
"""
Set a particular state on this task.
Sets a particular State on this task.
Updates splitter and combiner information when the `split` or `combine` method
is called.

Parameters
----------
Expand Down Expand Up @@ -371,7 +439,7 @@ def _run_task(self):

@property
def cache_dir(self):
"""Get the location of the cache directory."""
"""Get directory path to write cache if a prior cache is not found in `cache_locations`."""
return self._cache_dir

@cache_dir.setter
Expand All @@ -385,7 +453,7 @@ def cache_dir(self, location):

@property
def cache_locations(self):
"""Get the list of cache sources."""
"""Get the list of directories used to search for cached results."""
return self._cache_locations + ensure_list(self._cache_dir)

@cache_locations.setter
Expand Down Expand Up @@ -560,7 +628,7 @@ def combine(self, combiner, overwrite=False):
def get_input_el(self, ind):
"""Collect all inputs required to run the node (for specific state element)."""
if ind is not None:
# TODO: doesnt work properly for more cmplicated wf (check if still an issue)
# TODO: doesnt work properly for more complicated wf (check if still an issue)
state_dict = self.state.states_val[ind]
input_ind = self.state.inputs_ind[ind]
inputs_dict = {}
Expand Down Expand Up @@ -623,6 +691,18 @@ def done(self):
return False

def _combined_output(self, return_inputs=False):
"""
Combine outputs of a splitted task

Parameters
----------


Returns
-------
combined_results : TODO

"""
combined_results = []
for (gr, ind_l) in self.state.final_combined_ind_mapping.items():
combined_results_gr = []
Expand Down Expand Up @@ -650,13 +730,14 @@ def result(self, state_index=None, return_inputs=False):
----------
state_index : :obj: `int`
index of the element for task with splitter and multiple states
return_inputs : :obj: `bool`, :obj:`str`
return_inputs : :obj: `bool` or :obj:`str`
if True or "val" result is returned together with values of the input fields,
if "ind" result is returned together with indices of the input fields

Returns
-------
result :
result
TODO

"""
# TODO: check if result is available in load_result and
Expand Down Expand Up @@ -748,7 +829,7 @@ def __init__(
Parameters
----------
name : :obj:`str`
Unique name of this node
Unique name of this workflow
audit_flags : :class:`AuditFlag`, optional
Configure provenance tracking. Default is no provenance tracking.
See available flags at :class:`~pydra.utils.messenger.AuditFlag`.
Expand Down