Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] asyncio event loop mismatch when using async actors with map_batches #47734

Open
scottjlee opened this issue Sep 18, 2024 · 0 comments
Open
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks

Comments

@scottjlee
Copy link
Contributor

scottjlee commented Sep 18, 2024

What happened + What you expected to happen

When using async actors with map_batches() and Python 3.9, this error occurs:

RuntimeError: Task <Task pending name='Task-2' coro=<_generate_transform_fn_for_async_map_batches.<locals>.transform_fn.<locals>.process_batch() running at /home/ray/anaconda3/lib/python3.9/site-packages/ray/data/_internal/planner/plan_udf_map_op.py:327> cb=[as_completed.<locals>._on_completion() at /home/ray/anaconda3/lib/python3.9/asyncio/tasks.py:598]> got Future <Future pending> attached to a different loop

With Python 3.11, the error does not occur. It's likely because Python 3.11 introduced multiple improvements to asyncio, and we need to improve the way we handle the event loop so it works with previous versions of Python.

Currently, the workaround is to use Python 3.11+.

Versions / Dependencies

Python 3.9, ray 2.35

Reproduction script

async def task_yield(row):
        return row

    class AsyncActor:
        def __init__(self):
            pass

        async def __call__(self, batch):
            rows = [{"id": np.array([i])} for i in batch["id"]]
            tasks = [asyncio.create_task(task_yield(row)) for row in rows]
            for task in tasks:
                yield await task

    n = 8
    ds = ray.data.range(n, override_num_blocks=n)
    ds = ds.map_batches(
        AsyncActor,
        batch_size=n,
        compute=ray.data.ActorPoolStrategy(size=1, max_tasks_in_flight_per_actor=n),
        concurrency=1,
        max_concurrency=n,
    )

    output = ds.take_all()
    expected_output = [{"id": i} for i in range(n)]
    # Because all tasks are submitted almost simultaneously,
    # the output order may be different compared to the original input.
    assert len(output) == len(expected_output), (len(output), len(expected_output))

Issue Severity

Medium: It is a significant difficulty but I can work around it.

@scottjlee scottjlee added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) P1 Issue that should be fixed within a few weeks data Ray Data-related issues and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 18, 2024
@scottjlee scottjlee changed the title [Data] asyncio RuntimeError when using async actors with map_batches [Data] asyncio event loop mismatch when using async actors with map_batches Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

1 participant