Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove sync_compatible from build_server #16314

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/prefect/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ async def suspend_flow_run(
flow_run_id: Optional[UUID] = None,
timeout: Optional[int] = 3600,
key: Optional[str] = None,
client: PrefectClient = None,
client: Optional[PrefectClient] = None,
) -> None:
...

Expand All @@ -318,7 +318,7 @@ async def suspend_flow_run(
flow_run_id: Optional[UUID] = None,
timeout: Optional[int] = 3600,
key: Optional[str] = None,
client: PrefectClient = None,
client: Optional[PrefectClient] = None,
) -> T:
...

Expand All @@ -330,7 +330,7 @@ async def suspend_flow_run(
flow_run_id: Optional[UUID] = None,
timeout: Optional[int] = 3600,
key: Optional[str] = None,
client: PrefectClient = None,
client: Optional[PrefectClient] = None,
) -> Optional[T]:
"""
Suspends a flow run by stopping code execution until resumed.
Expand Down
179 changes: 77 additions & 102 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
Callable,
Coroutine,
Generic,
Hashable,
Iterable,
NoReturn,
Optional,
Expand All @@ -43,7 +42,7 @@
from pydantic.v1.decorator import ValidatedFunction as V1ValidatedFunction
from pydantic.v1.errors import ConfigError # TODO
from rich.console import Console
from typing_extensions import Literal, ParamSpec, Self
from typing_extensions import Literal, ParamSpec, TypeAlias

from prefect._internal.concurrency.api import create_call, from_async
from prefect.blocks.core import Block
Expand Down Expand Up @@ -105,7 +104,11 @@
T = TypeVar("T") # Generic type var for capturing the inner return type of async funcs
R = TypeVar("R") # The return type of the user's function
P = ParamSpec("P") # The parameters of the flow
F = TypeVar("F", bound="Flow") # The type of the flow
F = TypeVar("F", bound="Flow[Any, Any]") # The type of the flow

StateHookCallable: TypeAlias = Callable[
[FlowSchema, FlowRun, State], Union[Awaitable[None], None]
]
Comment on lines +109 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧼


logger = get_logger("flows")

Expand Down Expand Up @@ -195,15 +198,11 @@ def __init__(
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
on_completion: Optional[
list[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_failure: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_cancellation: Optional[
list[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_completion: Optional[list[StateHookCallable]] = None,
on_failure: Optional[list[StateHookCallable]] = None,
on_cancellation: Optional[list[StateHookCallable]] = None,
on_crashed: Optional[list[StateHookCallable]] = None,
on_running: Optional[list[StateHookCallable]] = None,
):
if name is not None and not isinstance(name, str):
raise TypeError(
Expand Down Expand Up @@ -375,7 +374,7 @@ def __init__(
def ismethod(self) -> bool:
return hasattr(self.fn, "__prefect_self__")

def __get__(self, instance, owner):
def __get__(self, instance: Any, owner: Any):
"""
Implement the descriptor protocol so that the flow can be used as an instance method.
When an instance method is loaded, this method is called with the "self" instance as
Expand All @@ -402,24 +401,22 @@ def with_options(
retry_delay_seconds: Optional[Union[int, float]] = None,
description: Optional[str] = None,
flow_run_name: Optional[Union[Callable[[], str], str]] = None,
task_runner: Union[Type[TaskRunner], TaskRunner, None] = None,
task_runner: Union[
Type[TaskRunner[PrefectFuture[R]]], TaskRunner[PrefectFuture[R]], None
] = None,
timeout_seconds: Union[int, float, None] = None,
validate_parameters: Optional[bool] = None,
persist_result: Optional[bool] = NotSet, # type: ignore
result_storage: Optional[ResultStorage] = NotSet, # type: ignore
result_serializer: Optional[ResultSerializer] = NotSet, # type: ignore
cache_result_in_memory: Optional[bool] = None,
log_prints: Optional[bool] = NotSet, # type: ignore
on_completion: Optional[
list[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_failure: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_cancellation: Optional[
list[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
) -> Self:
on_completion: Optional[list[StateHookCallable]] = None,
on_failure: Optional[list[StateHookCallable]] = None,
on_cancellation: Optional[list[StateHookCallable]] = None,
on_crashed: Optional[list[StateHookCallable]] = None,
on_running: Optional[list[StateHookCallable]] = None,
) -> "Flow[P, R]":
"""
Create a new flow from the current object, updating provided options.

Expand Down Expand Up @@ -645,7 +642,7 @@ async def to_deployment(
paused: Optional[bool] = None,
schedules: Optional["FlexibleScheduleList"] = None,
concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
parameters: Optional[dict] = None,
parameters: Optional[dict[str, Any]] = None,
triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
description: Optional[str] = None,
tags: Optional[list[str]] = None,
Expand Down Expand Up @@ -755,33 +752,23 @@ def my_other_flow(name):
entrypoint_type=entrypoint_type,
)

def on_completion(
self, fn: Callable[[FlowSchema, FlowRun, State], None]
) -> Callable[[FlowSchema, FlowRun, State], None]:
def on_completion(self, fn: StateHookCallable) -> StateHookCallable:
self.on_completion_hooks.append(fn)
return fn

def on_cancellation(
self, fn: Callable[[FlowSchema, FlowRun, State], None]
) -> Callable[[FlowSchema, FlowRun, State], None]:
def on_cancellation(self, fn: StateHookCallable) -> StateHookCallable:
self.on_cancellation_hooks.append(fn)
return fn

def on_crashed(
self, fn: Callable[[FlowSchema, FlowRun, State], None]
) -> Callable[[FlowSchema, FlowRun, State], None]:
def on_crashed(self, fn: StateHookCallable) -> StateHookCallable:
self.on_crashed_hooks.append(fn)
return fn

def on_running(
self, fn: Callable[[FlowSchema, FlowRun, State], None]
) -> Callable[[FlowSchema, FlowRun, State], None]:
def on_running(self, fn: StateHookCallable) -> StateHookCallable:
self.on_running_hooks.append(fn)
return fn

def on_failure(
self, fn: Callable[[FlowSchema, FlowRun, State], None]
) -> Callable[[FlowSchema, FlowRun, State], None]:
def on_failure(self, fn: StateHookCallable) -> StateHookCallable:
self.on_failure_hooks.append(fn)
return fn

Expand Down Expand Up @@ -1039,8 +1026,11 @@ def my_flow(name: str = "world"):
await storage.pull_code()

full_entrypoint = str(storage.destination / entrypoint)
flow: Flow = await from_async.wait_for_call_in_new_thread(
create_call(load_flow_from_entrypoint, full_entrypoint)
flow = cast(
Flow[P, R],
await from_async.wait_for_call_in_new_thread(
create_call(load_flow_from_entrypoint, full_entrypoint)
),
)
flow._storage = storage
flow._entrypoint = entrypoint
Expand Down Expand Up @@ -1442,17 +1432,11 @@ def flow(
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
on_completion: Optional[
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
] = None,
on_failure: Optional[
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
] = None,
on_cancellation: Optional[
list[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_completion: Optional[list[StateHookCallable]] = None,
on_failure: Optional[list[StateHookCallable]] = None,
on_cancellation: Optional[list[StateHookCallable]] = None,
on_crashed: Optional[list[StateHookCallable]] = None,
on_running: Optional[list[StateHookCallable]] = None,
) -> Callable[[Callable[P, R]], Flow[P, R]]:
...

Expand All @@ -1474,17 +1458,11 @@ def flow(
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
on_completion: Optional[
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
] = None,
on_failure: Optional[
list[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
] = None,
on_cancellation: Optional[
list[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_crashed: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_running: Optional[list[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_completion: Optional[list[StateHookCallable]] = None,
on_failure: Optional[list[StateHookCallable]] = None,
on_cancellation: Optional[list[StateHookCallable]] = None,
on_crashed: Optional[list[StateHookCallable]] = None,
on_running: Optional[list[StateHookCallable]] = None,
):
"""
Decorator to designate a function as a Prefect workflow.
Expand Down Expand Up @@ -1593,30 +1571,27 @@ def flow(
if isinstance(__fn, (classmethod, staticmethod)):
method_decorator = type(__fn).__name__
raise TypeError(f"@{method_decorator} should be applied on top of @flow")
return cast(
Flow[P, R],
Flow(
fn=__fn,
name=name,
version=version,
flow_run_name=flow_run_name,
task_runner=task_runner,
description=description,
timeout_seconds=timeout_seconds,
validate_parameters=validate_parameters,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
persist_result=persist_result,
result_storage=result_storage,
result_serializer=result_serializer,
cache_result_in_memory=cache_result_in_memory,
log_prints=log_prints,
on_completion=on_completion,
on_failure=on_failure,
on_cancellation=on_cancellation,
on_crashed=on_crashed,
on_running=on_running,
),
return Flow(
fn=__fn,
name=name,
version=version,
flow_run_name=flow_run_name,
task_runner=task_runner,
description=description,
timeout_seconds=timeout_seconds,
validate_parameters=validate_parameters,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
persist_result=persist_result,
result_storage=result_storage,
result_serializer=result_serializer,
cache_result_in_memory=cache_result_in_memory,
log_prints=log_prints,
on_completion=on_completion,
on_failure=on_failure,
on_cancellation=on_cancellation,
on_crashed=on_crashed,
on_running=on_running,
)
else:
return cast(
Expand Down Expand Up @@ -1668,10 +1643,10 @@ def _raise_on_name_with_banned_characters(name: Optional[str]) -> Optional[str]:


def select_flow(
flows: Iterable[Flow],
flows: Iterable[Flow[P, R]],
flow_name: Optional[str] = None,
from_message: Optional[str] = None,
) -> Flow:
) -> Flow[P, R]:
"""
Select the only flow in an iterable or a flow specified by name.

Expand Down Expand Up @@ -1716,7 +1691,7 @@ def select_flow(
def load_flow_from_entrypoint(
entrypoint: str,
use_placeholder_flow: bool = True,
) -> Flow:
) -> Flow[P, Any]:
"""
Extract a flow object from a script at an entrypoint by running all of the code in the file.

Expand All @@ -1740,7 +1715,7 @@ def load_flow_from_entrypoint(
else:
path, func_name = entrypoint.rsplit(".", maxsplit=1)
try:
flow = import_object(entrypoint)
flow: Flow[P, Any] = import_object(entrypoint) # pyright: ignore[reportRedeclaration]
except AttributeError as exc:
raise MissingFlowError(
f"Flow function with name {func_name!r} not found in {path!r}. "
Expand All @@ -1749,13 +1724,13 @@ def load_flow_from_entrypoint(
# If the flow has dependencies that are not installed in the current
# environment, fallback to loading the flow via AST parsing.
if use_placeholder_flow:
flow = safe_load_flow_from_entrypoint(entrypoint)
flow: Optional[Flow[P, Any]] = safe_load_flow_from_entrypoint(entrypoint)
if flow is None:
raise
else:
raise

if not isinstance(flow, Flow):
if not isinstance(flow, Flow): # pyright: ignore[reportUnnecessaryIsInstance]
raise MissingFlowError(
f"Function with name {func_name!r} is not a flow. Make sure that it is "
"decorated with '@flow'."
Expand All @@ -1770,7 +1745,7 @@ def serve(
print_starting_message: bool = True,
limit: Optional[int] = None,
**kwargs: Any,
):
) -> None:
"""
Serve the provided list of deployments.

Expand Down Expand Up @@ -1840,7 +1815,7 @@ async def aserve(
print_starting_message: bool = True,
limit: Optional[int] = None,
**kwargs: Any,
):
) -> None:
"""
Asynchronously serve the provided list of deployments.

Expand Down Expand Up @@ -1945,7 +1920,7 @@ async def load_flow_from_flow_run(
ignore_storage: bool = False,
storage_base_path: Optional[str] = None,
use_placeholder_flow: bool = True,
) -> Flow:
) -> Flow[P, Any]:
"""
Load a flow from the location/script provided in a deployment's storage document.

Expand Down Expand Up @@ -2024,7 +1999,7 @@ async def load_flow_from_flow_run(
return flow


def load_placeholder_flow(entrypoint: str, raises: Exception):
def load_placeholder_flow(entrypoint: str, raises: Exception) -> Flow[P, Any]:
"""
Load a placeholder flow that is initialized with the same arguments as the
flow specified in the entrypoint. If called the flow will raise `raises`.
Expand Down Expand Up @@ -2202,7 +2177,7 @@ def _sanitize_and_load_flow(

def load_flow_arguments_from_entrypoint(
entrypoint: str, arguments: Optional[Union[list[str], set[str]]] = None
) -> dict[Hashable, Any]:
) -> dict[str, Any]:
"""
Extract flow arguments from an entrypoint string.

Expand Down Expand Up @@ -2235,7 +2210,7 @@ def load_flow_arguments_from_entrypoint(
"log_prints",
}

result: dict[Hashable, Any] = {}
result: dict[str, Any] = {}

for decorator in func_def.decorator_list:
if (
Expand All @@ -2248,7 +2223,7 @@ def load_flow_arguments_from_entrypoint(

if isinstance(keyword.value, ast.Constant):
# Use the string value of the argument
result[keyword.arg] = str(keyword.value.value)
result[cast(str, keyword.arg)] = str(keyword.value.value)
continue

# if the arg value is not a raw str (i.e. a variable or expression),
Expand All @@ -2261,7 +2236,7 @@ def load_flow_arguments_from_entrypoint(

try:
evaluated_value = eval(cleaned_value, namespace) # type: ignore
result[keyword.arg] = str(evaluated_value)
result[cast(str, keyword.arg)] = str(evaluated_value)
except Exception as e:
logger.info(
"Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",
Expand Down
Loading
Loading