-
@task
async def tsk_copy_all(conf):
defers = [
tsk_copy_data.submit(conf, cls=FirmParam),
tsk_add_props_help.submit(conf),
tsk_copy_goods.submit(conf),
tsk_copy_cats.submit(conf)]
res = wait(defers)
if errs := tuple(r for r in res.done if r.state.is_failed or r.state.is_crashed):
err_tsks_names = ???
print('tasks error:', err_tsks_names)
return Failed(message=f'Not all tasks Ok: {err_tsks_names}' |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
hi @tonal - to read the run names I think you'd have to go back to the API since the run name doesn't live on for exampleimport asyncio
import time
from uuid import UUID
from prefect import flow, get_client, task
from prefect.client.schemas.filters import TaskRunFilter, TaskRunFilterId
from prefect.futures import wait
@task
def errors_on_odd(number: int) -> int:
if number % 2 == 0:
return number
else:
raise ValueError("This is a test error")
async def _task_run_names_from_ids(ids: tuple[UUID, ...]) -> list[str]:
results = []
while len(results) != len(list(ids)):
async with get_client() as client:
results = [
run.name
for run in await client.read_task_runs(
task_run_filter=TaskRunFilter(id=TaskRunFilterId(any_=list(ids)))
)
]
await asyncio.sleep(0.1)
return results
@flow(log_prints=True)
async def some_flow():
futures = errors_on_odd.map(range(10))
if errored_task_run_ids := tuple(
f.task_run_id
for f in wait(futures).done
if f.state.is_failed() or f.state.is_crashed()
):
start = time.monotonic()
err_names = await _task_run_names_from_ids(errored_task_run_ids)
print(f"finished with errors: {err_names}")
print(f"took {time.monotonic() - start} seconds to get task run names")
else:
print("finished without errors")
if __name__ == "__main__":
asyncio.run(some_flow())
maybe we ought to find a way to make this easier (perhaps by putting the run name on also just to call out that here in your example you have
this would always be true because you're checking the truthy-ness of the methods themselves instead of calling them beforeif r.state.is_failed or r.state.is_crashed afterif r.state.is_failed() or r.state.is_crashed() |
Beta Was this translation helpful? Give feedback.
hi @tonal - to read the run names I think you'd have to go back to the API since the run name doesn't live on
state_details
(which is available here isfut.state.state_details
, which may not be ideal because it might take a second for the run info to propagate per #14661for example