Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][aDAG] Clean up shutdown path #47702

Merged
merged 14 commits into from
Oct 22, 2024

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Sep 17, 2024

Why are these changes needed?

Vairous fixes including #47685 to clean up shutdown path.

  • Make teardown idemoptent
  • Remove unblocking teardown(wait=False). It is prone to all weird errors because we don't do synchronization properly before shutdown.
  • Previously, we teardown on __del__. It works well at runtime but not at shutdown time because desetruction order is not guaranteed at shutdown. We should use atexit handler instead. To fix this issue, I keep tracking of all compiled dags created (via weakref) and do teardown inside shutdown API which is called when the interpreter shutdown
  • Fix asyncio read/write being blocked and joined forever issue. We check read/write every 1 second and check sys.is_finalizing() which sets to True upon interpreting exiting time. We can't rely on atexit handler.teardown because asyncio read/write runs in thread pool, and thread pool is joined "before at exit handler is executed". See https://github.com/python/cpython/blob/8f82d9aa2191db7826bb7a453fe06ce65f966cf8/Lib/concurrent/futures/thread.py#L37 (this atexit handler always is called before python's regular atexit handler).
  • Change teardown logs to debug so that it won't be printed unless necessary.

Related issue number

Closes #47685 (comment)

Closes #47628.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@rkooo567 rkooo567 added the go add ONLY when ready to merge, run all tests label Sep 17, 2024
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Can you add a description of the changes to the PR text?

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
return [c.read() for c in self._input_channels]
results = []
for c in self._input_channels:
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add while loop

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@rkooo567 rkooo567 changed the title [WIP][core][aDAG] Clean up shutdown path [core][aDAG] Clean up shutdown path Sep 19, 2024
Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/common.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/common.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/common.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/common.py Outdated Show resolved Hide resolved
@stephanie-wang
Copy link
Contributor

This might also close #47413?

@rkooo567
Copy link
Contributor Author

Yeah I actually asked him to try... let's see how it goes

@anyscalesam
Copy link
Contributor

Note - in standup today this is blocking other PRs from merging in #compiled-graph cc @stephanie-wang

@rkooo567
Copy link
Contributor Author

rkooo567 commented Oct 8, 2024

ah yeah sorry about that... Feel free to push code directly and merge it. I think it should be almost ready

@rkooo567
Copy link
Contributor Author

fixed the issue. should be ready to be merged tmrw

@rkooo567
Copy link
Contributor Author

@stephanie-wang @ruisearch42 can you give an approval for merge?

Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise LGTM

@@ -1874,6 +1874,11 @@ def shutdown(_exiting_interpreter: bool = False):
and false otherwise. If we are exiting the interpreter, we will
wait a little while to print any extra error messages.
"""
# Make sure to clean up compiled dag node if exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docstring as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel like it is kind of implementation details?

python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Show resolved Hide resolved
python/ray/experimental/channel/common.py Show resolved Hide resolved
python/ray/experimental/channel/common.py Show resolved Hide resolved
Comment on lines +385 to +390
for c in self._input_channels:
exiting = retry_and_check_interpreter_exit(
lambda: results.append(c.read(timeout=1))
)
if exiting:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand, the original code hangs because it waits inside C++ and therefore does not respect interpreter exiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. it is due to

Fix asyncio read/write being blocked and joined forever issue. We check read/write every 1 second and check sys.is_finalizing() which sets to True upon interpreting exiting time. We can't rely on atexit handler.teardown because asyncio read/write runs in thread pool, and thread pool is joined "before at exit handler is executed". See https://github.com/python/cpython/blob/8f82d9aa2191db7826bb7a453fe06ce65f966cf8/Lib/concurrent/futures/thread.py#L37 (this atexit handler always is called before python's regular atexit handler).

@rkooo567
Copy link
Contributor Author

I will address the. comments in a follow up

@rkooo567 rkooo567 merged commit bc99a3b into ray-project:master Oct 22, 2024
5 checks passed
akyang-anyscale pushed a commit to akyang-anyscale/ray that referenced this pull request Oct 22, 2024
Vairous fixes including ray-project#47685 to clean up shutdown path.

Make teardown idemoptent
Remove unblocking teardown(wait=False). It is prone to all weird errors because we don't do synchronization properly before shutdown.
Previously, we teardown on __del__. It works well at runtime but not at shutdown time because desetruction order is not guaranteed at shutdown. We should use atexit handler instead. To fix this issue, I keep tracking of all compiled dags created (via weakref) and do teardown inside shutdown API which is called when the interpreter shutdown
Fix asyncio read/write being blocked and joined forever issue. We check read/write every 1 second and check sys.is_finalizing() which sets to True upon interpreting exiting time. We can't rely on atexit handler.teardown because asyncio read/write runs in thread pool, and thread pool is joined "before at exit handler is executed". See https://github.com/python/cpython/blob/8f82d9aa2191db7826bb7a453fe06ce65f966cf8/Lib/concurrent/futures/thread.py#L37 (this atexit handler always is called before python's regular atexit handler).
Change teardown logs to debug so that it won't be printed unless necessary.
ruisearch42 pushed a commit to ruisearch42/ray that referenced this pull request Oct 22, 2024
aslonnie pushed a commit that referenced this pull request Oct 23, 2024
)

## Why are these changes needed?

This PR fixes test_torch_tensor_dag_gpu with the following quick patches:
1. Revert #47702 , otherwise there is segfault
2. Move TestNcclGroup as an inner class for the tests, otherwise there are the following error:

```
(TorchTensorWorker pid=2261373) No module named 'test_torch_tensor_dag'
(TorchTensorWorker pid=2261373) Traceback (most recent call last):
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 460, in deserialize_objects
(TorchTensorWorker pid=2261373)     obj = self._deserialize_object(data, metadata, object_ref)
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 317, in _deserialize_object
(TorchTensorWorker pid=2261373)     return self._deserialize_msgpack_data(data, metadata_fields)
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 272, in _deserialize_msgpack_data
(TorchTensorWorker pid=2261373)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 262, in _deserialize_pickle5_data
(TorchTensorWorker pid=2261373)     obj = pickle.loads(in_band)
(TorchTensorWorker pid=2261373) ModuleNotFoundError: No module named 'test_torch_tensor_dag'
```
stephanie-wang pushed a commit that referenced this pull request Oct 25, 2024
Jay-ju pushed a commit to Jay-ju/ray that referenced this pull request Nov 5, 2024
Vairous fixes including ray-project#47685 to clean up shutdown path.

Make teardown idemoptent
Remove unblocking teardown(wait=False). It is prone to all weird errors because we don't do synchronization properly before shutdown.
Previously, we teardown on __del__. It works well at runtime but not at shutdown time because desetruction order is not guaranteed at shutdown. We should use atexit handler instead. To fix this issue, I keep tracking of all compiled dags created (via weakref) and do teardown inside shutdown API which is called when the interpreter shutdown
Fix asyncio read/write being blocked and joined forever issue. We check read/write every 1 second and check sys.is_finalizing() which sets to True upon interpreting exiting time. We can't rely on atexit handler.teardown because asyncio read/write runs in thread pool, and thread pool is joined "before at exit handler is executed". See https://github.com/python/cpython/blob/8f82d9aa2191db7826bb7a453fe06ce65f966cf8/Lib/concurrent/futures/thread.py#L37 (this atexit handler always is called before python's regular atexit handler).
Change teardown logs to debug so that it won't be printed unless necessary.
Jay-ju pushed a commit to Jay-ju/ray that referenced this pull request Nov 5, 2024
…-project#48204)

## Why are these changes needed?

This PR fixes test_torch_tensor_dag_gpu with the following quick patches:
1. Revert ray-project#47702 , otherwise there is segfault
2. Move TestNcclGroup as an inner class for the tests, otherwise there are the following error:

```
(TorchTensorWorker pid=2261373) No module named 'test_torch_tensor_dag'
(TorchTensorWorker pid=2261373) Traceback (most recent call last):
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 460, in deserialize_objects
(TorchTensorWorker pid=2261373)     obj = self._deserialize_object(data, metadata, object_ref)
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 317, in _deserialize_object
(TorchTensorWorker pid=2261373)     return self._deserialize_msgpack_data(data, metadata_fields)
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 272, in _deserialize_msgpack_data
(TorchTensorWorker pid=2261373)     python_objects = self._deserialize_pickle5_data(pickle5_data)
(TorchTensorWorker pid=2261373)   File "/home/ubuntu/ray/python/ray/_private/serialization.py", line 262, in _deserialize_pickle5_data
(TorchTensorWorker pid=2261373)     obj = pickle.loads(in_band)
(TorchTensorWorker pid=2261373) ModuleNotFoundError: No module named 'test_torch_tensor_dag'
```
Jay-ju pushed a commit to Jay-ju/ray that referenced this pull request Nov 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][aDAG] asyncio run hangs upon shutdown [aDAG] [Tests] Seg fault when running failed tests back-to-back
5 participants