Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[aDAG] support buffered input (#47272)" #47611

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/source/ray-core/api/exceptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ Exceptions
ray.exceptions.ObjectReconstructionFailedLineageEvictedError
ray.exceptions.RayChannelError
ray.exceptions.RayChannelTimeoutError
ray.exceptions.RayAdagCapacityExceeded
ray.exceptions.RuntimeEnvSetupError
ray.exceptions.CrossLanguageError
ray.exceptions.RaySystemError
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dag/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ py_test_module_list(
)

py_test_module_list(
size = "enormous",
size = "large",
files = [
"tests/experimental/test_accelerated_dag.py",
],
Expand Down
65 changes: 14 additions & 51 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ def __init__(
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
):
"""
Args:
Expand Down Expand Up @@ -626,10 +625,6 @@ def __init__(
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions that
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to get the result,
otherwise, RayAdagCapacityExceeded is raised.

Returns:
Channel: A wrapper around ray.ObjectRef.
Expand All @@ -638,37 +633,29 @@ def __init__(

ctx = DAGContext.get_current()

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
self._max_inflight_executions = max_inflight_executions
if self._max_inflight_executions is None:
self._max_inflight_executions = ctx.max_inflight_executions
self._dag_id = uuid.uuid4().hex
self._execution_timeout: Optional[float] = execution_timeout
if self._execution_timeout is None:
self._execution_timeout = ctx.execution_timeout
self._buffer_size_bytes: Optional[int] = buffer_size_bytes
if self._buffer_size_bytes is None:
self._buffer_size_bytes = ctx.buffer_size_bytes

self._default_type_hint: ChannelOutputType = SharedMemoryType(
self._buffer_size_bytes,
# We conservatively set num_shm_buffers to _max_inflight_executions.
# It means that the DAG can be underutilized, but it guarantees there's
# no false positive timeouts.
num_shm_buffers=self._max_inflight_executions,
self._buffer_size_bytes
)
if not isinstance(self._buffer_size_bytes, int) or self._buffer_size_bytes <= 0:
raise ValueError(
"`buffer_size_bytes` must be a positive integer, found "
f"{self._buffer_size_bytes}"
)

self._enable_asyncio: bool = enable_asyncio
self._fut_queue = asyncio.Queue()
self._asyncio_max_queue_size: Optional[int] = asyncio_max_queue_size
# TODO(rui): consider unify it with asyncio_max_queue_size
self._max_buffered_results: Optional[int] = max_buffered_results
if self._max_buffered_results is None:
self._max_buffered_results = ctx.max_buffered_results
# Used to ensure that the future returned to the
# caller corresponds to the correct DAG output. I.e.
# order of futures added to fut_queue should match the
Expand Down Expand Up @@ -734,7 +721,7 @@ def __init__(
self._execution_index: int = 0
# The maximum index of finished executions.
# All results with higher indexes have not been generated yet.
self._max_finished_execution_index: int = -1
self._max_execution_index: int = -1
self._result_buffer: Dict[int, Any] = {}

def _get_creator_or_proxy_actor() -> "ray.actor.ActorHandle":
Expand Down Expand Up @@ -778,12 +765,6 @@ def _get_creator_or_proxy_actor() -> "ray.actor.ActorHandle":

self._creator_or_proxy_actor = _get_creator_or_proxy_actor()

def increment_max_finished_execution_index(self) -> None:
"""Increment the max finished execution index. It is used to
figure out the max number of in-flight requests to the DAG
"""
self._max_finished_execution_index += 1

@property
def has_single_output(self):
return self._has_single_output
Expand Down Expand Up @@ -1872,20 +1853,6 @@ def run(self):
monitor.start()
return monitor

def raise_if_too_many_inflight_requests(self):
num_in_flight_requests = (
self._execution_index - self._max_finished_execution_index
)
if num_in_flight_requests > self._max_inflight_executions:
raise ray.exceptions.RayAdagCapacityExceeded(
f"There are {num_in_flight_requests} in-flight requests which "
"is more than specified _max_inflight_executions of the dag: "
f"{self._max_inflight_executions}. Retrieve the output using "
"ray.get before submitting more requests or increase "
"`max_inflight_executions`. "
"`adag.experimental_compile(_max_inflight_executions=...)`"
)

def _execute_until(
self,
execution_index: int,
Expand Down Expand Up @@ -1914,10 +1881,10 @@ def _execute_until(
if timeout is None:
timeout = ctx.retrieval_timeout

while self._max_finished_execution_index < execution_index:
if self._max_finished_execution_index + 1 == execution_index:
while self._max_execution_index < execution_index:
if self._max_execution_index + 1 == execution_index:
# Directly fetch and return without buffering
self.increment_max_finished_execution_index()
self._max_execution_index += 1
return self._dag_output_fetcher.read(timeout)
# Otherwise, buffer the result
if len(self._result_buffer) >= self._max_buffered_results:
Expand All @@ -1926,10 +1893,10 @@ def _execute_until(
f"buffered results is {self._max_buffered_results}; call ray.get() "
"on previous CompiledDAGRefs to free them up from buffer."
)
self.increment_max_finished_execution_index()
self._max_execution_index += 1
start_time = time.monotonic()
self._result_buffer[
self._max_finished_execution_index
self._max_execution_index
] = self._dag_output_fetcher.read(timeout)
if timeout != -1:
timeout -= time.monotonic() - start_time
Expand Down Expand Up @@ -1975,7 +1942,6 @@ def execute(
else:
inp = RayDAGArgs(args=args, kwargs=kwargs)

self.raise_if_too_many_inflight_requests()
self._dag_submitter.write(inp, self._execution_timeout)

ref = CompiledDAGRef(self, self._execution_index)
Expand Down Expand Up @@ -2034,7 +2000,6 @@ async def execute_async(
else:
inp = RayDAGArgs(args=args, kwargs=kwargs)

self.raise_if_too_many_inflight_requests()
await self._dag_submitter.write(inp)
# Allocate a future that the caller can use to get the result.
fut = asyncio.Future()
Expand Down Expand Up @@ -2070,15 +2035,13 @@ def build_compiled_dag_from_ray_dag(
enable_asyncio: bool = False,
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
) -> "CompiledDAG":
compiled_dag = CompiledDAG(
execution_timeout,
buffer_size_bytes,
enable_asyncio,
asyncio_max_queue_size,
max_buffered_results,
max_inflight_executions,
)

def _build_compiled_dag(node):
Expand Down
6 changes: 0 additions & 6 deletions python/ray/dag/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
# The default max_buffered_results is 1000, and the default buffer size is 1 MB.
# The maximum memory usage for buffered results is 1 GB.
DEFAULT_MAX_BUFFERED_RESULTS = int(os.environ.get("RAY_DAG_max_buffered_results", 1000))
# The default number of in-flight executions that can be submitted before consuming the
# output.
DEFAULT_MAX_INFLIGHT_EXECUTIONS = int(
os.environ.get("RAY_DAG_max_inflight_executions", 10)
)


@DeveloperAPI
Expand Down Expand Up @@ -65,7 +60,6 @@ class DAGContext:
buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES
asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE
max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS
max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS

@staticmethod
def get_current() -> "DAGContext":
Expand Down
6 changes: 0 additions & 6 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ def experimental_compile(
enable_asyncio: bool = False,
_asyncio_max_queue_size: Optional[int] = None,
_max_buffered_results: Optional[int] = None,
_max_inflight_executions: Optional[int] = None,
) -> "ray.dag.CompiledDAG":
"""Compile an accelerated execution path for this DAG.

Expand All @@ -187,10 +186,6 @@ def experimental_compile(
executions is beyond the DAG capacity, the new execution would
be blocked in the first place; therefore, this limit is only
enforced when it is smaller than the DAG capacity.
_max_inflight_executions: The maximum number of in-flight requests that
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to clear finished
in-flight requests.

Returns:
A compiled DAG.
Expand Down Expand Up @@ -223,7 +218,6 @@ def experimental_compile(
enable_asyncio,
_asyncio_max_queue_size,
_max_buffered_results,
_max_inflight_executions,
)

def execute(
Expand Down
3 changes: 0 additions & 3 deletions python/ray/dag/dag_node_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ def __init__(
self.exec_task_idx = exec_task_idx
self.type = operation_type

def __repr__(self):
return f"(Task idx: {self.exec_task_idx}, Type: {self.type})"


@total_ordering
class _DAGOperationGraphNode:
Expand Down
Loading
Loading