Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Remove ray.kill in ActorPoolMapOperator #47752

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -640,13 +640,15 @@ def _kill_all_running_actors(self):

def _kill_running_actor(self, actor: ray.actor.ActorHandle):
"""Kill the provided actor and remove it from the pool."""
ray.kill(actor)
del self._num_tasks_in_flight[actor]
del self._actor_locations[actor]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a note that we intend to refactor this with the pause API at some point?

assert actor not in self._pending_actors

def _kill_pending_actor(self, ready_ref: ray.ObjectRef):
"""Kill the provided pending actor and remove it from the pool."""
actor = self._pending_actors.pop(ready_ref)
ray.kill(actor)
assert actor not in self._num_tasks_in_flight
assert actor not in self._actor_locations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we're asserting after modified the state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assert is for a different dict.
it's to make sure the actor handle is not being ref'ed by any of these 3 dicts


def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
"""Ask Ray for the node id of the given bundle.
Expand Down
15 changes: 15 additions & 0 deletions python/ray/data/tests/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import pytest

import ray
from ray._private.test_utils import wait_for_condition
from ray.data._internal.execution.interfaces.ref_bundle import (
_ref_bundles_iterator_to_block_refs_list,
)
from ray.data._internal.execution.operators.actor_pool_map_operator import _MapWorker
from ray.data.context import DataContext
from ray.data.exceptions import UserCodeException
from ray.data.tests.conftest import * # noqa
Expand Down Expand Up @@ -76,6 +78,19 @@ def test_basic_actors(shutdown_only):
concurrency=(8, 4),
)

# Make sure all actors are dead after dataset execution finishes.
def _all_actors_dead():
actor_table = ray.state.actors()
actors = {
id: actor_info
for actor_info in actor_table.values()
if actor_info["ActorClassName"] == _MapWorker.__name__
}
assert len(actors) > 0
return all(actor_info["State"] == "DEAD" for actor_info in actors.values())

wait_for_condition(_all_actors_dead)


def test_callable_classes(shutdown_only):
ray.init(num_cpus=2)
Expand Down