Skip to content

Commit

Permalink
Merge pull request #33 from tochka-public/breaking_change/remove_node…
Browse files Browse the repository at this point in the history
…like

BREAKING CHANGE: (#31) Removing the special nodes classes
  • Loading branch information
ca11mejane authored Sep 22, 2024
2 parents cc2a0d0 + edd533b commit 3e85cf0
Show file tree
Hide file tree
Showing 56 changed files with 282 additions and 483 deletions.
18 changes: 3 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,17 @@
## Table of Contents

- [Usage](#usage)
- [Что нужно, чтобы сделать свой пайплайн?](#что-нужно-чтобы-сделать-свой-пайплайн)
- [Поддерживаемые типы узлов](#поддерживаемые-типы-узлов)
- [Development](#development)
- [Environment setup](#environment-setup)


## Usage
### Что нужно, чтобы сделать свой пайплайн?

1. Написать классы узлов
2. Связать узлы посредством указания зависимости
To create a pipeline:
1. Define classes that represent nodes
2. Connect nodes by defining the parent node(-s) or no parent for each node


### Поддерживаемые типы узлов

[Протоколы](ml_pipeline_engine/types.py)

1. [DataSource](ml_pipeline_engine/base_nodes/datasources.py)
2. [FeatureBase](ml_pipeline_engine/base_nodes/feature.py)
3. [MLModelBase](ml_pipeline_engine/base_nodes/ml_model.py)
4. [ProcessorBase](ml_pipeline_engine/base_nodes/processors.py)
5. [FeatureVectorizerBase](ml_pipeline_engine/base_nodes/vectorizer.py)

Примеры использования описаны в файле [docs/usage_examples.md](docs/usage_examples.md)

## Development
Expand Down
Empty file.
15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/datasources.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/feature.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/ml_model.py

This file was deleted.

15 changes: 0 additions & 15 deletions ml_pipeline_engine/base_nodes/vectorizer.py

This file was deleted.

7 changes: 4 additions & 3 deletions ml_pipeline_engine/chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import EventManagerLike
from ml_pipeline_engine.types import ModelName
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import PipelineChartLike
from ml_pipeline_engine.types import PipelineId
from ml_pipeline_engine.types import PipelineResult

NodeResultT = t.TypeVar('NodeResultT')

Entrypoint = t.Optional[t.Union[NodeLike[NodeResultT], DAGLike[NodeResultT]]]
Entrypoint = t.Optional[t.Union[NodeBase[NodeResultT], DAGLike[NodeResultT]]]


@dataclass(frozen=True, repr=False)
Expand All @@ -30,7 +31,7 @@ class PipelineChartBase:


@dataclass(frozen=True, repr=False)
class PipelineChart(PipelineChartBase):
class PipelineChart(PipelineChartBase, PipelineChartLike):
"""
Основная реализация определения пайплайна ML-модели
"""
Expand Down
3 changes: 2 additions & 1 deletion ml_pipeline_engine/context/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from ml_pipeline_engine.types import ModelName
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import PipelineChartLike
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import PipelineId


class DAGPipelineContext(EventSourceMixin):
class DAGPipelineContext(EventSourceMixin, PipelineContextLike):
"""
Контекст выполнения пайплайна ML-модели
"""
Expand Down
4 changes: 2 additions & 2 deletions ml_pipeline_engine/dag/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from ml_pipeline_engine.parallelism import threads_pool_registry
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import DAGRunManagerLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeResultT
from ml_pipeline_engine.types import PipelineContextLike
from ml_pipeline_engine.types import RetryPolicyLike
Expand All @@ -23,7 +23,7 @@ class DAG(DAGLike):
output_node: NodeId
is_process_pool_needed: bool
is_thread_pool_needed: bool
node_map: t.Dict[NodeId, NodeLike]
node_map: t.Dict[NodeId, NodeBase]
retry_policy: t.Type[RetryPolicyLike] = NodeRetryPolicy
run_manager: t.Type[DAGRunManagerLike] = DAGRunConcurrentManager

Expand Down
39 changes: 21 additions & 18 deletions ml_pipeline_engine/dag_builders/annotation/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from ml_pipeline_engine.types import DAGLike
from ml_pipeline_engine.types import NodeBase
from ml_pipeline_engine.types import NodeId
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import RecurrentProtocol

__all__ = [
Expand All @@ -39,7 +38,7 @@
class AnnotationDAGBuilder:
def __init__(self) -> None:
self._dag = DiGraph(name='main-graph')
self._node_map: t.Dict[NodeId, NodeLike] = dict()
self._node_map: t.Dict[NodeId, NodeBase] = dict()
self._recurrent_sub_graphs: t.List[t.Tuple[NodeId, NodeId]] = []
self._synthetic_nodes: t.List[NodeId] = []

Expand All @@ -49,22 +48,21 @@ def _check_annotations(obj: t.Any) -> None:
Проверка наличия аннотаций типов у переданного объекта.
В случае, если есть хотя бы один не типизированный параметр, будет ошибка.
"""
run_method = get_callable_run_method(obj)

obj = get_callable_run_method(obj)

annotations = getattr(obj, '__annotations__', None)
annotations = getattr(run_method, '__annotations__', None)
parameters = [
(name, bool(parameter.empty))
for name, parameter in inspect.signature(obj).parameters.items()
for name, parameter in inspect.signature(run_method).parameters.items()
if name not in ('self', 'args', 'kwargs')
]

if not annotations and parameters:
raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={obj}')
raise errors.UndefinedAnnotation(f'Невозможно найти аннотации типов. obj={run_method}')

for name, is_empty in parameters:
if is_empty and name not in annotations:
raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={obj}')
raise errors.UndefinedParamAnnotation(f'Не указан тип для параметра name={name}, obj={run_method}')

@staticmethod
def _check_base_class(node: t.Any) -> None:
Expand All @@ -80,7 +78,7 @@ def _check_base_class(node: t.Any) -> None:
f'У объекта не существует корректного базового класса, пригодного для графа. node={node}',
)

def validate_node(self, node: NodeLike) -> None:
def validate_node(self, node: NodeBase) -> None:
"""
Валидация ноды по разным правилам
"""
Expand All @@ -89,7 +87,7 @@ def validate_node(self, node: NodeLike) -> None:
self._check_annotations(node)

@staticmethod
def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]:
def _get_input_marks_map(node: NodeBase) -> t.List[NodeInputSpec]:
"""
Получение меток зависимостей для входных kwarg-ов узла
"""
Expand All @@ -112,7 +110,7 @@ def _get_input_marks_map(node: NodeLike) -> t.List[NodeInputSpec]:

return inputs

def _add_node_to_map(self, node: NodeLike) -> None:
def _add_node_to_map(self, node: NodeBase) -> None:
"""
Добавление узла в мэппинг "Имя узла -> Класс/функция узла"
"""
Expand All @@ -137,15 +135,15 @@ def _add_switch_node(self, node_id: NodeId, switch_decide_node_id: NodeId) -> No
self._dag.add_node(node_id, **{NodeField.is_switch: True})
self._dag.add_edge(switch_decide_node_id, node_id, **{EdgeField.is_switch: True})

def _traverse_breadth_first_to_dag(self, input_node: NodeLike, output_node: NodeLike): # noqa
def _traverse_breadth_first_to_dag(self, input_node: NodeBase, output_node: NodeBase): # noqa
"""
Выполнить обход зависимостей классов/функций узлов, построить граф
"""

visited = {output_node}
stack = deque([output_node])

def _set_visited(node: NodeLike) -> None:
def _set_visited(node: NodeBase) -> None:
if node in visited:
return

Expand Down Expand Up @@ -299,7 +297,7 @@ def _is_executor_needed(self) -> t.Tuple[bool, bool]:

return is_process_pool_needed, is_thread_pool_needed

def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike:
def build(self, input_node: NodeBase, output_node: NodeBase = None) -> DAGLike:
"""
Построить граф путем сборки зависимостей по аннотациям типа (меткам входов)
"""
Expand Down Expand Up @@ -327,8 +325,8 @@ def build(self, input_node: NodeLike, output_node: NodeLike = None) -> DAGLike:


def build_dag(
input_node: NodeLike[t.Any],
output_node: NodeLike[NodeResultT],
input_node: NodeBase[t.Any],
output_node: NodeBase[NodeResultT],
) -> DAGLike[NodeResultT]:
"""
Построить граф путем сборки зависимостей по аннотациям типа (меткам входов)
Expand All @@ -347,7 +345,9 @@ def build_dag(
)


def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]:
def build_dag_single(
node: NodeBase[NodeResultT],
) -> DAGLike[NodeResultT]:
"""
Построить граф из одного узла
Expand All @@ -357,4 +357,7 @@ def build_dag_single(node: NodeLike[NodeResultT]) -> DAGLike[NodeResultT]:
Returns:
Граф
"""
return AnnotationDAGBuilder().build(input_node=node, output_node=None)
return (
AnnotationDAGBuilder()
.build(input_node=node, output_node=None)
)
34 changes: 17 additions & 17 deletions ml_pipeline_engine/dag_builders/annotation/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,75 +2,75 @@
from dataclasses import dataclass

from ml_pipeline_engine.types import CaseLabel
from ml_pipeline_engine.types import NodeLike
from ml_pipeline_engine.types import NodeBase

NodeResultT = t.TypeVar('NodeResultT')


@dataclass(frozen=True)
class InputGenericMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


@dataclass(frozen=True)
class InputMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


@dataclass(frozen=True)
class InputOneOfMark:
nodes: t.List[NodeLike[t.Any]]
nodes: t.List[NodeBase[t.Any]]


def InputOneOf(nodes: t.List[NodeLike[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def InputOneOf(nodes: t.List[NodeBase[NodeResultT]]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
"""
Принимает список нод, возвращает результат первой успешно выполненной ноды
"""
return t.cast(t.Any, InputOneOfMark(nodes))


def InputGeneric(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def InputGeneric(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, InputGenericMark(node))


def Input(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def Input(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, InputMark(node))


@dataclass(frozen=True)
class GenericInputMark:
node: NodeLike[t.Any]
node: NodeBase[t.Any]


def GenericInput(node: NodeLike[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
def GenericInput(node: NodeBase[NodeResultT]) -> t.Type[NodeResultT]: # noqa: N802,RUF100
return t.cast(t.Any, GenericInputMark(node))


@dataclass(frozen=True)
class SwitchCaseMark:
switch: NodeLike[t.Any]
cases: t.List[t.Tuple[str, NodeLike]]
switch: NodeBase[t.Any]
cases: t.List[t.Tuple[str, NodeBase]]
name: str


def SwitchCase( # noqa: N802,RUF100
switch: NodeLike[t.Any],
cases: t.List[t.Tuple[CaseLabel, NodeLike[NodeResultT]]],
switch: NodeBase[t.Any],
cases: t.List[t.Tuple[CaseLabel, NodeBase[NodeResultT]]],
name: t.Optional[str] = None,
) -> t.Type[NodeResultT]:
return t.cast(t.Any, SwitchCaseMark(switch, cases, name))


@dataclass(frozen=True)
class RecurrentSubGraphMark:
start_node: NodeLike[NodeResultT]
dest_node: NodeLike[NodeResultT]
start_node: NodeBase[NodeResultT]
dest_node: NodeBase[NodeResultT]
max_iterations: int


def RecurrentSubGraph( # noqa: N802,RUF100
start_node: t.Type[NodeLike[NodeResultT]],
dest_node: t.Type[NodeLike[NodeResultT]],
start_node: t.Type[NodeBase[NodeResultT]],
dest_node: t.Type[NodeBase[NodeResultT]],
max_iterations: int,
) -> t.Type[NodeResultT]:
"""
Expand Down
Loading

0 comments on commit 3e85cf0

Please sign in to comment.