diff --git a/doc/source/ray-core/api/exceptions.rst b/doc/source/ray-core/api/exceptions.rst index 4f6d4366f254..b7b2e85d2aeb 100644 --- a/doc/source/ray-core/api/exceptions.rst +++ b/doc/source/ray-core/api/exceptions.rst @@ -34,7 +34,6 @@ Exceptions ray.exceptions.ObjectReconstructionFailedLineageEvictedError ray.exceptions.RayChannelError ray.exceptions.RayChannelTimeoutError - ray.exceptions.RayAdagCapacityExceeded ray.exceptions.RuntimeEnvSetupError ray.exceptions.CrossLanguageError ray.exceptions.RaySystemError diff --git a/python/ray/dag/BUILD b/python/ray/dag/BUILD index 7431e6ce4fad..72c8b4b38b22 100644 --- a/python/ray/dag/BUILD +++ b/python/ray/dag/BUILD @@ -117,7 +117,7 @@ py_test_module_list( ) py_test_module_list( - size = "enormous", + size = "large", files = [ "tests/experimental/test_accelerated_dag.py", ], diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index ef0762222346..1dd7aed7b0ae 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -597,7 +597,6 @@ def __init__( enable_asyncio: bool = False, asyncio_max_queue_size: Optional[int] = None, max_buffered_results: Optional[int] = None, - max_inflight_executions: Optional[int] = None, ): """ Args: @@ -626,10 +625,6 @@ def __init__( executions is beyond the DAG capacity, the new execution would be blocked in the first place; therefore, this limit is only enforced when it is smaller than the DAG capacity. - max_inflight_executions: The maximum number of in-flight executions that - are allowed to be sent to this DAG. Before submitting more requests, - the caller is responsible for calling ray.get to get the result, - otherwise, RayAdagCapacityExceeded is raised. Returns: Channel: A wrapper around ray.ObjectRef. @@ -638,16 +633,6 @@ def __init__( ctx = DAGContext.get_current() - self._enable_asyncio: bool = enable_asyncio - self._fut_queue = asyncio.Queue() - self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size - # TODO(rui): consider unify it with asyncio_max_queue_size - self._max_buffered_results: Optional[int] = max_buffered_results - if self._max_buffered_results is None: - self._max_buffered_results = ctx.max_buffered_results - self._max_inflight_executions = max_inflight_executions - if self._max_inflight_executions is None: - self._max_inflight_executions = ctx.max_inflight_executions self._dag_id = uuid.uuid4().hex self._execution_timeout: Optional[float] = execution_timeout if self._execution_timeout is None: @@ -655,13 +640,8 @@ def __init__( self._buffer_size_bytes: Optional[int] = buffer_size_bytes if self._buffer_size_bytes is None: self._buffer_size_bytes = ctx.buffer_size_bytes - self._default_type_hint: ChannelOutputType = SharedMemoryType( - self._buffer_size_bytes, - # We conservatively set num_shm_buffers to _max_inflight_executions. - # It means that the DAG can be underutilized, but it guarantees there's - # no false positive timeouts. - num_shm_buffers=self._max_inflight_executions, + self._buffer_size_bytes ) if not isinstance(self._buffer_size_bytes, int) or self._buffer_size_bytes <= 0: raise ValueError( @@ -669,6 +649,13 @@ def __init__( f"{self._buffer_size_bytes}" ) + self._enable_asyncio: bool = enable_asyncio + self._fut_queue = asyncio.Queue() + self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size + # TODO(rui): consider unify it with asyncio_max_queue_size + self._max_buffered_results: Optional[int] = max_buffered_results + if self._max_buffered_results is None: + self._max_buffered_results = ctx.max_buffered_results # Used to ensure that the future returned to the # caller corresponds to the correct DAG output. I.e. # order of futures added to fut_queue should match the @@ -734,7 +721,7 @@ def __init__( self._execution_index: int = 0 # The maximum index of finished executions. # All results with higher indexes have not been generated yet. - self._max_finished_execution_index: int = -1 + self._max_execution_index: int = -1 self._result_buffer: Dict[int, Any] = {} def _get_creator_or_proxy_actor() -> "ray.actor.ActorHandle": @@ -778,12 +765,6 @@ def _get_creator_or_proxy_actor() -> "ray.actor.ActorHandle": self._creator_or_proxy_actor = _get_creator_or_proxy_actor() - def increment_max_finished_execution_index(self) -> None: - """Increment the max finished execution index. It is used to - figure out the max number of in-flight requests to the DAG - """ - self._max_finished_execution_index += 1 - @property def has_single_output(self): return self._has_single_output @@ -1872,20 +1853,6 @@ def run(self): monitor.start() return monitor - def raise_if_too_many_inflight_requests(self): - num_in_flight_requests = ( - self._execution_index - self._max_finished_execution_index - ) - if num_in_flight_requests > self._max_inflight_executions: - raise ray.exceptions.RayAdagCapacityExceeded( - f"There are {num_in_flight_requests} in-flight requests which " - "is more than specified _max_inflight_executions of the dag: " - f"{self._max_inflight_executions}. Retrieve the output using " - "ray.get before submitting more requests or increase " - "`max_inflight_executions`. " - "`adag.experimental_compile(_max_inflight_executions=...)`" - ) - def _execute_until( self, execution_index: int, @@ -1914,10 +1881,10 @@ def _execute_until( if timeout is None: timeout = ctx.retrieval_timeout - while self._max_finished_execution_index < execution_index: - if self._max_finished_execution_index + 1 == execution_index: + while self._max_execution_index < execution_index: + if self._max_execution_index + 1 == execution_index: # Directly fetch and return without buffering - self.increment_max_finished_execution_index() + self._max_execution_index += 1 return self._dag_output_fetcher.read(timeout) # Otherwise, buffer the result if len(self._result_buffer) >= self._max_buffered_results: @@ -1926,10 +1893,10 @@ def _execute_until( f"buffered results is {self._max_buffered_results}; call ray.get() " "on previous CompiledDAGRefs to free them up from buffer." ) - self.increment_max_finished_execution_index() + self._max_execution_index += 1 start_time = time.monotonic() self._result_buffer[ - self._max_finished_execution_index + self._max_execution_index ] = self._dag_output_fetcher.read(timeout) if timeout != -1: timeout -= time.monotonic() - start_time @@ -1975,7 +1942,6 @@ def execute( else: inp = RayDAGArgs(args=args, kwargs=kwargs) - self.raise_if_too_many_inflight_requests() self._dag_submitter.write(inp, self._execution_timeout) ref = CompiledDAGRef(self, self._execution_index) @@ -2034,7 +2000,6 @@ async def execute_async( else: inp = RayDAGArgs(args=args, kwargs=kwargs) - self.raise_if_too_many_inflight_requests() await self._dag_submitter.write(inp) # Allocate a future that the caller can use to get the result. fut = asyncio.Future() @@ -2070,7 +2035,6 @@ def build_compiled_dag_from_ray_dag( enable_asyncio: bool = False, asyncio_max_queue_size: Optional[int] = None, max_buffered_results: Optional[int] = None, - max_inflight_executions: Optional[int] = None, ) -> "CompiledDAG": compiled_dag = CompiledDAG( execution_timeout, @@ -2078,7 +2042,6 @@ def build_compiled_dag_from_ray_dag( enable_asyncio, asyncio_max_queue_size, max_buffered_results, - max_inflight_executions, ) def _build_compiled_dag(node): diff --git a/python/ray/dag/context.py b/python/ray/dag/context.py index 03eb8915d13b..da33e27dd3f7 100644 --- a/python/ray/dag/context.py +++ b/python/ray/dag/context.py @@ -19,11 +19,6 @@ # The default max_buffered_results is 1000, and the default buffer size is 1 MB. # The maximum memory usage for buffered results is 1 GB. DEFAULT_MAX_BUFFERED_RESULTS = int(os.environ.get("RAY_DAG_max_buffered_results", 1000)) -# The default number of in-flight executions that can be submitted before consuming the -# output. -DEFAULT_MAX_INFLIGHT_EXECUTIONS = int( - os.environ.get("RAY_DAG_max_inflight_executions", 10) -) @DeveloperAPI @@ -65,7 +60,6 @@ class DAGContext: buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS - max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS @staticmethod def get_current() -> "DAGContext": diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 031709382eda..972aee46b0ae 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -166,7 +166,6 @@ def experimental_compile( enable_asyncio: bool = False, _asyncio_max_queue_size: Optional[int] = None, _max_buffered_results: Optional[int] = None, - _max_inflight_executions: Optional[int] = None, ) -> "ray.dag.CompiledDAG": """Compile an accelerated execution path for this DAG. @@ -187,10 +186,6 @@ def experimental_compile( executions is beyond the DAG capacity, the new execution would be blocked in the first place; therefore, this limit is only enforced when it is smaller than the DAG capacity. - _max_inflight_executions: The maximum number of in-flight requests that - are allowed to be sent to this DAG. Before submitting more requests, - the caller is responsible for calling ray.get to clear finished - in-flight requests. Returns: A compiled DAG. @@ -223,7 +218,6 @@ def experimental_compile( enable_asyncio, _asyncio_max_queue_size, _max_buffered_results, - _max_inflight_executions, ) def execute( diff --git a/python/ray/dag/dag_node_operation.py b/python/ray/dag/dag_node_operation.py index 69dbd06dfe9f..b5e3974bd5b2 100644 --- a/python/ray/dag/dag_node_operation.py +++ b/python/ray/dag/dag_node_operation.py @@ -36,9 +36,6 @@ def __init__( self.exec_task_idx = exec_task_idx self.type = operation_type - def __repr__(self): - return f"(Task idx: {self.exec_task_idx}, Type: {self.type})" - @total_ordering class _DAGOperationGraphNode: diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index ad413dd974a1..79bb74f759ab 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -14,6 +14,7 @@ import pytest from ray.exceptions import RayChannelError, RayChannelTimeoutError +from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy import ray import ray._private import ray.cluster_utils @@ -22,7 +23,6 @@ from ray._private.utils import ( get_or_create_event_loop, ) -from ray.dag import DAGContext from ray.experimental.channel.torch_tensor_type import TorchTensorType @@ -38,15 +38,6 @@ ] -@pytest.fixture -def temporary_change_timeout(request): - ctx = DAGContext.get_current() - original = ctx.execution_timeout - ctx.execution_timeout = request.param - yield ctx.execution_timeout - ctx.execution_timeout = original - - @ray.remote class Actor: def __init__(self, init_value, fail_after=None, sys_exit=False): @@ -804,6 +795,39 @@ def test_chain_dag(ray_start_regular, num_actors): compiled_dag.teardown() +def test_execution_timeout(ray_start_regular): + a = Actor.remote(0) + with InputNode() as inp: + dag = a.inc.bind(inp) + + compiled_dag = dag.experimental_compile(_execution_timeout=2) + refs = [] + timed_out = False + epsilon = 0.1 # Allow for some slack in the timeout checking + for i in range(5): + try: + start_time = time.monotonic() + ref = compiled_dag.execute(1) + # Hold the refs to avoid get() being called on the ref + # in `__del__()` when it goes out of scope + refs.append(ref) + except RayChannelTimeoutError: + duration = time.monotonic() - start_time + assert duration > 2 - epsilon + assert duration < 2 + epsilon + # The first 3 tasks should complete, and the 4th one + # should block then time out because the max possible + # concurrent executions for the DAG is 3. See the + # following diagram: + # driver -(3)-> a.inc (2) -(1)-> driver + assert i == 3 + timed_out = True + break + assert timed_out + + compiled_dag.teardown() + + def test_get_timeout(ray_start_regular): a = Actor.remote(0) with InputNode() as inp: @@ -1745,99 +1769,57 @@ def test_driver_and_actor_as_readers(ray_start_cluster): dag.experimental_compile() -@pytest.mark.parametrize("temporary_change_timeout", [1], indirect=True) -def test_buffered_inputs(shutdown_only, temporary_change_timeout): - ray.init() +def test_payload_large(ray_start_cluster): + cluster = ray_start_cluster + # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor). + first_node_handle = cluster.add_node(num_cpus=1) + # This node is for the reader. + second_node_handle = cluster.add_node(num_cpus=1) + ray.init(address=cluster.address) + cluster.wait_for_nodes() - MAX_INFLIGHT_EXECUTIONS = 10 - DAG_EXECUTION_TIME = 0.2 + nodes = [first_node_handle.node_id, second_node_handle.node_id] + # We want to check that there are two nodes. Thus, we convert `nodes` to a set and + # then back to a list to remove duplicates. Then we check that the length of `nodes` + # is 2. + nodes = list(set(nodes)) + assert len(nodes) == 2 - # Timeout should be larger than a single execution time. - assert temporary_change_timeout > DAG_EXECUTION_TIME - # Entire execution time (iteration * execution) should be higher than - # the timeout for testing. - assert DAG_EXECUTION_TIME * MAX_INFLIGHT_EXECUTIONS > temporary_change_timeout + def create_actor(node): + return Actor.options( + scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) + ).remote(0) - @ray.remote - class Actor1: - def fwd(self, x): - print("Actor1 fwd") - time.sleep(DAG_EXECUTION_TIME) - return x - - actor1 = Actor1.remote() - - # Since the timeout is 1 second, if buffering is not working, - # it will timeout (0.12s for each dag * MAX_INFLIGHT_EXECUTIONS). - with InputNode() as input_node: - dag = actor1.fwd.bind(input_node) - - # With buffering it should work. - dag = dag.experimental_compile(_max_inflight_executions=MAX_INFLIGHT_EXECUTIONS) - - # Test the regular case. - output_refs = [] - for i in range(MAX_INFLIGHT_EXECUTIONS): - output_refs.append(dag.execute(i)) - for i, ref in enumerate(output_refs): - assert ray.get(ref) == i - - # Test there are more items than max bufcfered inputs. - output_refs = [] - for i in range(MAX_INFLIGHT_EXECUTIONS): - output_refs.append(dag.execute(i)) - with pytest.raises(ray.exceptions.RayAdagCapacityExceeded): - dag.execute(1) - assert len(output_refs) == MAX_INFLIGHT_EXECUTIONS - for i, ref in enumerate(output_refs): - assert ray.get(ref) == i + def get_node_id(self): + return ray.get_runtime_context().get_node_id() - # Make sure it works properly after that. - output_refs = [] - for i in range(MAX_INFLIGHT_EXECUTIONS): - output_refs.append(dag.execute(i)) - for i, ref in enumerate(output_refs): - assert ray.get(ref) == i + driver_node = get_node_id(None) + nodes.remove(driver_node) - dag.teardown() + a = create_actor(nodes[0]) + a_node = ray.get(a.__ray_call__.remote(get_node_id)) + assert a_node == nodes[0] + # Check that the driver and actor are on different nodes. + assert driver_node != a_node - # Test async case - with InputNode() as input_node: - async_dag = actor1.fwd.bind(input_node) + with InputNode() as i: + dag = a.echo.bind(i) - async_dag = async_dag.experimental_compile( - _max_inflight_executions=MAX_INFLIGHT_EXECUTIONS, - enable_asyncio=True, - ) + compiled_dag = dag.experimental_compile() - async def main(): - # Test the regular case. - output_refs = [] - for i in range(MAX_INFLIGHT_EXECUTIONS): - output_refs.append(await async_dag.execute_async(i)) - for i, ref in enumerate(output_refs): - assert await ref == i - - # Test there are more items than max bufcfered inputs. - output_refs = [] - for i in range(MAX_INFLIGHT_EXECUTIONS): - output_refs.append(await async_dag.execute_async(i)) - with pytest.raises(ray.exceptions.RayAdagCapacityExceeded): - await async_dag.execute_async(1) - assert len(output_refs) == MAX_INFLIGHT_EXECUTIONS - for i, ref in enumerate(output_refs): - assert await ref == i - - # Make sure it works properly after that. - output_refs = [] - for i in range(MAX_INFLIGHT_EXECUTIONS): - output_refs.append(await async_dag.execute_async(i)) - for i, ref in enumerate(output_refs): - assert await ref == i + # Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that + # is a bit larger. + size = 1024 * 1024 * 600 + val = b"x" * size - loop = get_or_create_event_loop() - loop.run_until_complete(main()) - async_dag.teardown() + for i in range(3): + ref = compiled_dag.execute(val) + result = ray.get(ref) + assert result == val + + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() def test_event_profiling(ray_start_regular, monkeypatch): @@ -1888,180 +1870,175 @@ def add_value_to_tensor(self, value: int, tensor: torch.Tensor) -> torch.Tensor: return tensor + value -""" -Accelerated DAGs support the following two cases for the input/output of the graph: - -1. Both the input and output of the graph are the driver process. -2. Both the input and output of the graph are the same actor process. - -This test suite covers the second case. The second case is useful when we use -Ray Serve to deploy the ADAG as a backend. In this case, the Ray Serve replica, -which is an actor, needs to be the input and output of the graph. -""" - - -def test_shared_memory_channel_only(shutdown_only): +class TestActorInputOutput: """ - Replica -> Worker -> Replica - - This test uses shared memory channels for all communication between actors. - """ - - @ray.remote - class Replica: - def __init__(self): - self.w = TestWorker.remote() - with InputNode() as inp: - dag = self.w.add_one.bind(inp) - self.compiled_dag = dag.experimental_compile() + Accelerated DAGs support the following two cases for the input/output of the graph: - def no_op(self, value): - return ray.get(self.compiled_dag.execute(value)) + 1. Both the input and output of the graph are the driver process. + 2. Both the input and output of the graph are the same actor process. - replica = Replica.remote() - ref = replica.no_op.remote(1) - assert ray.get(ref) == 2 - - -def test_intra_process_channel(shutdown_only): - """ - Replica -> Worker -> Worker -> Replica - - This test uses IntraProcessChannel between DAG nodes on the Worker actor. - Communication between the Replica and Worker actors is done through shared - memory channels. + This test suite covers the second case. The second case is useful when we use + Ray Serve to deploy the ADAG as a backend. In this case, the Ray Serve replica, + which is an actor, needs to be the input and output of the graph. """ - @ray.remote - class Replica: - def __init__(self): - self.w = TestWorker.remote() - with InputNode() as inp: - dag = self.w.add_one.bind(inp) - dag = self.w.add_one.bind(dag) - self.compiled_dag = dag.experimental_compile() + def test_shared_memory_channel_only(ray_start_cluster): + """ + Replica -> Worker -> Replica - def call(self, value): - return ray.get(self.compiled_dag.execute(value)) + This test uses shared memory channels for all communication between actors. + """ - replica = Replica.remote() - ref = replica.call.remote(1) - assert ray.get(ref) == 3 + @ray.remote + class Replica: + def __init__(self): + self.w = TestWorker.remote() + with InputNode() as inp: + dag = self.w.add_one.bind(inp) + self.compiled_dag = dag.experimental_compile() + def no_op(self, value): + return ray.get(self.compiled_dag.execute(value)) -def test_multiple_readers_multiple_writers(shutdown_only): - """ - Replica -> Worker1 -> Replica - | | - -> Worker2 - - - All communication in this DAG will be done through shared memory channels. - """ + replica = Replica.remote() + ref = replica.no_op.remote(1) + assert ray.get(ref) == 2 - @ray.remote - class Replica: - def __init__(self): - w1 = TestWorker.remote() - w2 = TestWorker.remote() - with InputNode() as inp: - dag = MultiOutputNode([w1.add_one.bind(inp), w2.add_one.bind(inp)]) - self.compiled_dag = dag.experimental_compile() + def test_intra_process_channel(ray_start_cluster): + """ + Replica -> Worker -> Worker -> Replica - def call(self, value): - ref = self.compiled_dag.execute(value) - return ray.get(ref) + This test uses IntraProcessChannel between DAG nodes on the Worker actor. + Communication between the Replica and Worker actors is done through shared + memory channels. + """ - replica = Replica.remote() - ref = replica.call.remote(1) - assert ray.get(ref) == [2, 2] + @ray.remote + class Replica: + def __init__(self): + self.w = TestWorker.remote() + with InputNode() as inp: + dag = self.w.add_one.bind(inp) + dag = self.w.add_one.bind(dag) + self.compiled_dag = dag.experimental_compile() + def call(self, value): + return ray.get(self.compiled_dag.execute(value)) -def test_multiple_readers_single_writer(shutdown_only): - """ - Replica -> Worker1 -> Worker1 -> Replica - | | - -> Worker2 - + replica = Replica.remote() + ref = replica.call.remote(1) + assert ray.get(ref) == 3 - Communication between DAG nodes on Worker1 is done through IntraProcessChannel. - Communication between different actors is done through shared memory channels. - """ + def test_multiple_readers_multiple_writers(ray_start_cluster): + """ + Replica -> Worker1 -> Replica + | | + -> Worker2 - - @ray.remote - class Replica: - def __init__(self): - w1 = TestWorker.remote() - w2 = TestWorker.remote() - with InputNode() as inp: - branch1 = w1.add_one.bind(inp) - branch2 = w2.add_one.bind(inp) - dag = w1.add.bind(branch1, branch2) - self.compiled_dag = dag.experimental_compile() - - def call(self, value): - return ray.get(self.compiled_dag.execute(value)) - - replica = Replica.remote() - ref = replica.call.remote(1) - assert ray.get(ref) == 4 + All communication in this DAG will be done through shared memory channels. + """ + @ray.remote + class Replica: + def __init__(self): + w1 = TestWorker.remote() + w2 = TestWorker.remote() + with InputNode() as inp: + dag = MultiOutputNode([w1.add_one.bind(inp), w2.add_one.bind(inp)]) + self.compiled_dag = dag.experimental_compile() + + def call(self, value): + ref = self.compiled_dag.execute(value) + return ray.get(ref) + + replica = Replica.remote() + ref = replica.call.remote(1) + assert ray.get(ref) == [2, 2] -def test_single_reader_multiple_writers(shutdown_only): - """ - Replica -> Worker1 -> Worker1 -> Replica - | | - -> Worker2 - + def test_multiple_readers_single_writer(ray_start_cluster): + """ + Replica -> Worker1 -> Worker1 -> Replica + | | + -> Worker2 - - Communication between DAG nodes on Worker1 is done through IntraProcessChannel. - Communication between different actors is done through shared memory channels. - """ + Communication between DAG nodes on Worker1 is done through IntraProcessChannel. + Communication between different actors is done through shared memory channels. + """ - @ray.remote - class Replica: - def __init__(self): - w1 = TestWorker.remote() - w2 = TestWorker.remote() - with InputNode() as inp: - dag = w1.add_one.bind(inp) - dag = MultiOutputNode([w1.add_one.bind(dag), w2.add_one.bind(dag)]) - self.compiled_dag = dag.experimental_compile() + @ray.remote + class Replica: + def __init__(self): + w1 = TestWorker.remote() + w2 = TestWorker.remote() + with InputNode() as inp: + branch1 = w1.add_one.bind(inp) + branch2 = w2.add_one.bind(inp) + dag = w1.add.bind(branch1, branch2) + self.compiled_dag = dag.experimental_compile() + + def call(self, value): + return ray.get(self.compiled_dag.execute(value)) + + replica = Replica.remote() + ref = replica.call.remote(1) + assert ray.get(ref) == 4 - def call(self, value): - return ray.get(self.compiled_dag.execute(value)) + def test_single_reader_multiple_writers(ray_start_cluster): + """ + Replica -> Worker1 -> Worker1 -> Replica + | | + -> Worker2 - - replica = Replica.remote() - ref = replica.call.remote(1) - assert ray.get(ref) == [3, 3] + Communication between DAG nodes on Worker1 is done through IntraProcessChannel. + Communication between different actors is done through shared memory channels. + """ + @ray.remote + class Replica: + def __init__(self): + w1 = TestWorker.remote() + w2 = TestWorker.remote() + with InputNode() as inp: + dag = w1.add_one.bind(inp) + dag = MultiOutputNode([w1.add_one.bind(dag), w2.add_one.bind(dag)]) + self.compiled_dag = dag.experimental_compile() + + def call(self, value): + return ray.get(self.compiled_dag.execute(value)) + + replica = Replica.remote() + ref = replica.call.remote(1) + assert ray.get(ref) == [3, 3] -def test_torch_tensor_type(shutdown_only): - """ - This test simulates the pattern of deploying a stable diffusion model with - Ray Serve. The base model takes a prompt and generates an image, which is a - tensor. Then, the refiner model takes the image tensor and the prompt to refine - the image. This test doesn't use the actual model but simulates the data flow. - """ + def test_torch_tensor_type(ray_start_cluster): + """ + This test simulates the pattern of deploying a stable diffusion model with + Ray Serve. The base model takes a prompt and generates an image, which is a + tensor. Then, the refiner model takes the image tensor and the prompt to refine + the image. This test doesn't use the actual model but simulates the data flow. + """ - @ray.remote - class Replica: - def __init__(self): - self._base = TestWorker.remote() - self._refiner = TestWorker.remote() + @ray.remote + class Replica: + def __init__(self): + self._base = TestWorker.remote() + self._refiner = TestWorker.remote() - with ray.dag.InputNode() as inp: - dag = self._refiner.add_value_to_tensor.bind( - inp, - self._base.generate_torch_tensor.bind( + with ray.dag.InputNode() as inp: + dag = self._refiner.add_value_to_tensor.bind( inp, - ).with_type_hint(TorchTensorType()), - ) - self._adag = dag.experimental_compile() - - def call(self, value): - return ray.get(self._adag.execute(value)) - - replica = Replica.remote() - ref = replica.call.remote(5) - assert torch.equal(ray.get(ref), torch.tensor([5, 5, 5, 5, 5])) + self._base.generate_torch_tensor.bind( + inp, + ).with_type_hint(TorchTensorType()), + ) + self._adag = dag.experimental_compile() + + def call(self, value): + return ray.get(self._adag.execute(value)) + + replica = Replica.remote() + ref = replica.call.remote(5) + assert torch.equal(ray.get(ref), torch.tensor([5, 5, 5, 5, 5])) if __name__ == "__main__": diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index a627e0f96fba..dadaf9c7892a 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -44,7 +44,6 @@ def double_and_inc(self, x): return self.i def echo(self, x): - print("ECHO!") self.count += 1 self._fail_if_needed() return x diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 4a3752c31365..0ccd22dfec36 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -845,13 +845,6 @@ class RayChannelTimeoutError(RayChannelError, TimeoutError): pass -@PublicAPI(stability="alpha") -class RayAdagCapacityExceeded(RaySystemError): - """Raised when the accelerated DAG channel's buffer is at max capacity""" - - pass - - RAY_EXCEPTION_TYPES = [ PlasmaObjectNotAvailable, RayError, @@ -879,5 +872,4 @@ class RayAdagCapacityExceeded(RaySystemError): ActorUnavailableError, RayChannelError, RayChannelTimeoutError, - RayAdagCapacityExceeded, ] diff --git a/python/ray/experimental/channel/__init__.py b/python/ray/experimental/channel/__init__.py index dd96938adc62..03e3be2d59e1 100644 --- a/python/ray/experimental/channel/__init__.py +++ b/python/ray/experimental/channel/__init__.py @@ -12,11 +12,7 @@ ) from ray.experimental.channel.gpu_communicator import GPUCommunicator from ray.experimental.channel.intra_process_channel import IntraProcessChannel -from ray.experimental.channel.shared_memory_channel import ( - BufferedSharedMemoryChannel, - Channel, - CompositeChannel, -) +from ray.experimental.channel.shared_memory_channel import Channel, CompositeChannel from ray.experimental.channel.torch_tensor_nccl_channel import TorchTensorNcclChannel __all__ = [ @@ -33,5 +29,4 @@ "TorchTensorNcclChannel", "IntraProcessChannel", "CompositeChannel", - "BufferedSharedMemoryChannel", ] diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 44da4736a475..a3abc7aaef65 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -10,7 +10,7 @@ from ray.experimental.channel.common import ChannelInterface, ChannelOutputType from ray.experimental.channel.intra_process_channel import IntraProcessChannel from ray.experimental.channel.torch_tensor_type import TorchTensorType -from ray.util.annotations import DeveloperAPI, PublicAPI +from ray.util.annotations import PublicAPI # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at @@ -103,17 +103,15 @@ def __init__( class SharedMemoryType(ChannelOutputType): - def __init__(self, buffer_size_bytes: int, *, num_shm_buffers: int): + def __init__(self, buffer_size_bytes: int): """ Args: buffer_size_bytes: The number of bytes to allocate for the object data and metadata. Writes to the channel must produce serialized data and metadata less than or equal to this value. - num_shm_buffers: The number of shared memory buffer per channel. """ super().__init__() self.buffer_size_bytes = buffer_size_bytes - self._num_shm_buffers = num_shm_buffers def create_channel( self, @@ -143,8 +141,7 @@ def create_channel( if self._contains_type.requires_nccl(): cpu_data_typ = SharedMemoryType( - buffer_size_bytes=self.buffer_size_bytes, - num_shm_buffers=1, + buffer_size_bytes=self.buffer_size_bytes ) return NestedTorchTensorNcclChannel( writer, @@ -153,7 +150,7 @@ def create_channel( cpu_data_typ=cpu_data_typ, ) - return CompositeChannel(writer, reader_and_node_list, self._num_shm_buffers) + return CompositeChannel(writer, reader_and_node_list) def set_nccl_group_id(self, group_id: str) -> None: assert self.requires_nccl() @@ -203,9 +200,9 @@ def __init__( assert isinstance(reader, ray.actor.ActorHandle) if typ is None: - typ = SharedMemoryType(DEFAULT_MAX_BUFFER_SIZE, num_shm_buffers=1) + typ = SharedMemoryType(DEFAULT_MAX_BUFFER_SIZE) elif isinstance(typ, int): - typ = SharedMemoryType(typ, num_shm_buffers=1) + typ = SharedMemoryType(typ) if typ.buffer_size_bytes < MIN_BUFFER_SIZE: raise ValueError( @@ -531,102 +528,6 @@ def close(self) -> None: ) -@DeveloperAPI -class BufferedSharedMemoryChannel(ChannelInterface): - """A channel that can be read and written by Ray processes. - - It creates `num_shm_buffers` number of buffers and allows buffered read and - write APIs. I.e., read and write APIs are non-blocking as long as it can write to - next buffer or read from a next buffer. See `read` and `write` APIs for - more details. - - Args: - writer: The actor that may write to the channel. None signifies the driver. - reader_and_node_list: A list of tuples, where each tuple contains a reader - actor handle and the node ID where the actor is located. - num_shm_buffers: Number of shared memory buffers to read/write. - typ: Type information about the values passed through the channel. - Either an integer representing the max buffer size in bytes - allowed, or a SharedMemoryType. - """ - - def __init__( - self, - writer: Optional[ray.actor.ActorHandle], - reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], - num_shm_buffers: int, - typ: Optional[Union[int, SharedMemoryType]] = None, - ): - self._num_shm_buffers = num_shm_buffers - self._buffers = [ - # We use Channel directly as a buffer implementation as - # channel only allows to have 1 shared memory buffer. - Channel(writer, reader_and_node_list, typ) - for _ in range(num_shm_buffers) - ] - # The next index to write from self._buffers. - self._next_write_index = 0 - # The next index to read from self._buffers. - self._next_read_index = 0 - - def ensure_registered_as_writer(self): - """ - Check whether the process is a valid writer. This method must be idempotent. - """ - for buffer in self._buffers: - buffer.ensure_registered_as_writer() - - def ensure_registered_as_reader(self): - """ - Check whether the process is a valid reader. This method must be idempotent. - """ - for buffer in self._buffers: - buffer.ensure_registered_as_reader() - - def write(self, value: Any, timeout: Optional[float] = None) -> None: - """Write a value to a channel. - - If the next buffer is available, it returns immediately. If the next - buffer is not read by downstream consumers, it blocks until a buffer is - available to write. If a buffer is not available within timeout, it raises - RayChannelTimeoutError. - """ - # A single channel is not supposed to read and write at the same time. - assert self._next_read_index == 0 - self._buffers[self._next_write_index].write(value, timeout) - self._next_write_index += 1 - self._next_write_index %= self._num_shm_buffers - - def read(self, timeout: Optional[float] = None) -> Any: - """Read a value from a channel. - - If the next buffer is available, it returns immediately. If the next - buffer is not written by an upstream producer, it blocks until a buffer is - available to read. If a buffer is not available within timeout, it raises - RayChannelTimeoutError. - """ - # A single channel is not supposed to read and write at the same time. - assert self._next_write_index == 0 - output = self._buffers[self._next_read_index].read(timeout) - self._next_read_index += 1 - self._next_read_index %= self._num_shm_buffers - return output - - def close(self) -> None: - for buffer in self._buffers: - buffer.close() - - @property - def next_write_index(self): - # Testing only - return self._next_write_index - - @property - def next_read_index(self): - # Testing only - return self._next_read_index - - @PublicAPI(stability="alpha") class CompositeChannel(ChannelInterface): """ @@ -645,7 +546,6 @@ def __init__( self, writer: Optional[ray.actor.ActorHandle], reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], - num_shm_buffers: int, _channel_dict: Optional[Dict[ray.ActorID, ChannelInterface]] = None, _channels: Optional[Set[ChannelInterface]] = None, _writer_registered: bool = False, @@ -653,7 +553,6 @@ def __init__( ): self._writer = writer self._reader_and_node_list = reader_and_node_list - self._num_shm_buffers = num_shm_buffers self._writer_registered = _writer_registered self._reader_registered = _reader_registered # A dictionary that maps the actor ID to the channel object. @@ -685,11 +584,8 @@ def __init__( # There are some remote readers which are not the same Ray actor as the writer. # Create a shared memory channel for the writer and the remote readers. if len(remote_reader_and_node_list) != 0: - remote_channel = BufferedSharedMemoryChannel( - self._writer, remote_reader_and_node_list, num_shm_buffers - ) + remote_channel = Channel(self._writer, remote_reader_and_node_list) self._channels.add(remote_channel) - for reader, _ in remote_reader_and_node_list: actor_id = self._get_actor_id(reader) self._channel_dict[actor_id] = remote_channel @@ -729,7 +625,6 @@ def __reduce__(self): return CompositeChannel, ( self._writer, self._reader_and_node_list, - self._num_shm_buffers, self._channel_dict, self._channels, self._writer_registered, diff --git a/python/ray/experimental/channel/torch_tensor_nccl_channel.py b/python/ray/experimental/channel/torch_tensor_nccl_channel.py index feb005c4e37e..ad95c69c7c84 100644 --- a/python/ray/experimental/channel/torch_tensor_nccl_channel.py +++ b/python/ray/experimental/channel/torch_tensor_nccl_channel.py @@ -269,9 +269,7 @@ def __init__( # metadata channel that will be used to send the shape and dtype of # the tensor to the receiver(s). metadata_type = SharedMemoryType( - buffer_size_bytes=TENSOR_METADATA_SIZE_BYTES, - # We only need 1 buffer per channel. - num_shm_buffers=1, + buffer_size_bytes=TENSOR_METADATA_SIZE_BYTES ) self._meta_channel = metadata_type.create_channel( self._writer, diff --git a/python/ray/experimental/channel/torch_tensor_type.py b/python/ray/experimental/channel/torch_tensor_type.py index 617eb39ccc6e..c37977728b43 100644 --- a/python/ray/experimental/channel/torch_tensor_type.py +++ b/python/ray/experimental/channel/torch_tensor_type.py @@ -126,7 +126,6 @@ def create_channel( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], _torch_tensor_allocator: Optional["TorchTensorAllocator"] = None, ) -> type: - if self.requires_nccl(): from ray.experimental.channel.torch_tensor_nccl_channel import ( TorchTensorNcclChannel, diff --git a/python/ray/experimental/compiled_dag_ref.py b/python/ray/experimental/compiled_dag_ref.py index f649f752f1ce..e873cc00ff1e 100644 --- a/python/ray/experimental/compiled_dag_ref.py +++ b/python/ray/experimental/compiled_dag_ref.py @@ -149,7 +149,6 @@ def __await__(self): self._fut = None return_vals = yield from fut.__await__() - self._dag.increment_max_finished_execution_index() return _process_return_vals( return_vals, diff --git a/python/ray/tests/test_channel.py b/python/ray/tests/test_channel.py index df3379df173d..bfb3d1e890f1 100644 --- a/python/ray/tests/test_channel.py +++ b/python/ray/tests/test_channel.py @@ -1,5 +1,4 @@ # coding: utf-8 -import pickle import logging import os import sys @@ -907,7 +906,7 @@ def pass_channel(self, channel): self._chan = channel def create_composite_channel(self, writer, reader_and_node_list): - self._chan = ray_channel.CompositeChannel(writer, reader_and_node_list, 10) + self._chan = ray_channel.CompositeChannel(writer, reader_and_node_list) return self._chan def read(self): @@ -922,7 +921,7 @@ def write(self, value): node2 = get_actor_node_id(actor2) # Create a channel to communicate between driver process and actor1. - driver_to_actor1_channel = ray_channel.CompositeChannel(None, [(actor1, node1)], 10) + driver_to_actor1_channel = ray_channel.CompositeChannel(None, [(actor1, node1)]) ray.get(actor1.pass_channel.remote(driver_to_actor1_channel)) driver_to_actor1_channel.write("hello") assert ray.get(actor1.read.remote()) == "hello" @@ -979,7 +978,7 @@ def pass_channel(self, channel): self._chan = channel def create_composite_channel(self, writer, reader_and_node_list): - self._chan = ray_channel.CompositeChannel(writer, reader_and_node_list, 10) + self._chan = ray_channel.CompositeChannel(writer, reader_and_node_list) return self._chan def read(self): @@ -995,7 +994,7 @@ def write(self, value): # The driver writes data to CompositeChannel and actor1 and actor2 read it. driver_output_channel = ray_channel.CompositeChannel( - None, [(actor1, node1), (actor2, node2)], 10 + None, [(actor1, node1), (actor2, node2)] ) ray.get(actor1.pass_channel.remote(driver_output_channel)) ray.get(actor2.pass_channel.remote(driver_output_channel)) @@ -1322,78 +1321,6 @@ def create_actor(node): ] -def test_buffered_channel(shutdown_only): - """Test buffered shared memory channel.""" - BUFFER_SIZE = 5 - - @ray.remote(num_cpus=1) - class Actor: - def __init__(self): - self.write_index = 0 - - def setup(self, driver_actor): - self._channel = ray_channel.BufferedSharedMemoryChannel( - ray.get_runtime_context().current_actor, - [(driver_actor, get_actor_node_id(driver_actor))], - BUFFER_SIZE, - typ=1000, - ) - - def get_channel(self): - return self._channel - - def write(self, i, timeout=None) -> bool: - """Write to a channel Return False if channel times out. - Return true otherwise. - """ - self.write_index += 1 - try: - self._channel.write(i, timeout) - except ray.exceptions.RayChannelTimeoutError: - return False - assert self._channel.next_write_index == self.write_index % BUFFER_SIZE - return True - - a = Actor.remote() - ray.get(a.setup.remote(create_driver_actor())) - chan = ray.get(a.get_channel.remote()) - - print("Test basic.") - # Iterate more than buffer size to make sure it works over and over again. - read_idx = 0 - for i in range(BUFFER_SIZE * 3): - read_idx += 1 - assert ray.get(a.write.remote(i)) - assert chan.read() == i - assert chan.next_read_index == read_idx % BUFFER_SIZE - - print("Test Write timeout.") - # Test write timeout. - for i in range(BUFFER_SIZE): - # fill the buffer withtout read. - ray.get(a.write.remote(i)) - # timeout because all the buffer is exhausted without being consumed. - assert ray.get(a.write.remote(1, timeout=1)) is False - - print("Test Read timeout.") - # Test read timeout. - for i in range(BUFFER_SIZE): - # This reads all previous writes. - assert chan.read() == i - # This read times out because there's no new write, and the call blocks. - with pytest.raises(ray.exceptions.RayChannelTimeoutError): - chan.read(timeout=1) - - print("Test serialization/deserialization works") - deserialized = pickle.loads(pickle.dumps(chan)) - assert len(chan._buffers) == len(deserialized._buffers) - for i in range(len(chan._buffers)): - assert ( - deserialized._buffers[i]._writer._actor_id - == chan._buffers[i]._writer._actor_id - ) - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/serve/serve_40797.log b/serve/serve_40797.log deleted file mode 100644 index e69de29bb2d1..000000000000