From 06508b801b6b1597c9a2d6afebbe2585c03f09c5 Mon Sep 17 00:00:00 2001 From: Michael Adkins Date: Wed, 17 Aug 2022 10:36:35 -0500 Subject: [PATCH] Improve robustness of subprocess text streaming Previously, this utility would call blocking writes to stderr and stdout directly. This appears to introduce the possibility of race conditions and blocked event loops when many processes are run concurrently. Since we use these utilities to launch flows in parallel processes from the agent, it is important that they are robust to concurrency. `wrap_file` _may_ not be thread safe, but this is still an improvement from where we were. There is not a clear suggested pattern for this, see extensive discussion at https://github.com/python-trio/trio/issues/174. --- src/prefect/utilities/processutils.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/prefect/utilities/processutils.py b/src/prefect/utilities/processutils.py index 46729b5c59b2..c17250256b5e 100644 --- a/src/prefect/utilities/processutils.py +++ b/src/prefect/utilities/processutils.py @@ -8,7 +8,7 @@ import anyio.abc from anyio.streams.text import TextReceiveStream, TextSendStream -TextSink = Union[TextIO, TextSendStream] +TextSink = Union[anyio.AsyncFile, TextIO, TextSendStream] @asynccontextmanager @@ -101,9 +101,17 @@ async def consume_process_output( async def stream_text(source: TextReceiveStream, sink: Optional[TextSink]): + if isinstance(sink, TextIOBase): + # Convert the blocking sink to an async-compatible object + sink = anyio.wrap_file(sink) + async for item in source: if isinstance(sink, TextSendStream): await sink.send(item) - elif isinstance(sink, TextIOBase): - sink.write(item) - sink.flush() + elif isinstance(sink, anyio.AsyncFile): + await sink.write(item) + await sink.flush() + elif sink is None: + pass # Consume the item but perform no action + else: + raise TypeError(f"Unsupported sink type {type(sink).__name__}")