Skip to content

Commit

Permalink
Merge branch 'master' into redis-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rynewang authored Sep 18, 2024
2 parents f263839 + bc2b26e commit 3bcc147
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ To understand the following content better, you should understand the difference
* `jobId` (Optional): Defines the submission ID for the Ray job. If not provided, KubeRay generates one automatically. See {ref}`Ray Jobs CLI API Reference <ray-job-submission-cli-ref>` for more details about the submission ID.
* `metadata` (Optional): See {ref}`Ray Jobs CLI API Reference <ray-job-submission-cli-ref>` for more details about the `--metadata-json` option.
* `entrypointNumCpus` / `entrypointNumGpus` / `entrypointResources` (Optional): See {ref}`Ray Jobs CLI API Reference <ray-job-submission-cli-ref>` for more details.
* `backoffLimit` (Optional, added in version 1.2.0): Specifies the number of retries before marking this RayJob failed. Each retry creates a new RayCluster. The default value is 0.
* Submission configuration
* `submissionMode` (Optional): `submissionMode` specifies how RayJob submits the Ray job to the RayCluster. In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job. In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job. The default value is "K8sJobMode".
* `submitterPodTemplate` (Optional): Defines the Pod template for the submitter Kubernetes Job. This field is only effective when `submissionMode` is "K8sJobMode".
* `RAY_DASHBOARD_ADDRESS` - The KubeRay operator injects this environment variable to the submitter Pod. The value is `$HEAD_SERVICE:$DASHBOARD_PORT`.
* `RAY_JOB_SUBMISSION_ID` - The KubeRay operator injects this environment variable to the submitter Pod. The value is the `RayJob.Status.JobId` of the RayJob.
* Example: `ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...`
* See [ray-job.sample.yaml](https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-job.sample.yaml) for more details.
* `submitterConfig` (Optional): Additional configurations for the submitter Kubernetes Job.
* `backoffLimit` (Optional, added in version 1.2.0): The number of retries before marking the submitter Job as failed. The default value is 2.
* Automatic resource cleanup
* `shutdownAfterJobFinishes` (Optional): Determines whether to recycle the RayCluster after the Ray job finishes. The default value is false.
* `ttlSecondsAfterFinished` (Optional): Only works if `shutdownAfterJobFinishes` is true. The KubeRay operator deletes the RayCluster and the submitter `ttlSecondsAfterFinished` seconds after the Ray job finishes. The default value is 0.
* `activeDeadlineSeconds` (Optional): If the RayJob doesn't transition the `JobDeploymentStatus` to `Complete` or `Failed` within `activeDeadlineSeconds`, the KubeRay operator transitions the `JobDeploymentStatus` to `Failed`, citing `DeadlineExceeded` as the reason.
* `DELETE_RAYJOB_CR_AFTER_JOB_FINISHES` (Optional, added in version 1.2.0): Set this environment variable for the KubeRay operator, not the RayJob resource. If you set this environment variable to true, the RayJob custom resource itself is deleted if you also set `shutdownAfterJobFinishes` to true. Note that KubeRay deletes all resources created by the RayJob, including the Kubernetes Job.

## Example: Run a simple Ray job with RayJob

Expand Down
7 changes: 0 additions & 7 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,6 @@ def __init__(
# Mapping from the actor handle to the node ID that the actor is on.
self.actor_to_node_id: Dict["ray.actor.ActorHandle", str] = {}

# Type hints specified by the user for DAG (intermediate) outputs.
self._type_hints = []

# This is set to true when type hint of `transport="nccl"`` is used
self._use_default_nccl_group = False
# This is set to the specified custom nccl group
Expand Down Expand Up @@ -744,7 +741,6 @@ def _preprocess(self) -> None:

self.input_task_idx, self.output_task_idx = None, None
self.actor_task_count.clear()
self._type_hints.clear()

nccl_actors: Set["ray.actor.ActorHandle"] = set()

Expand Down Expand Up @@ -950,9 +946,6 @@ def _preprocess(self) -> None:
# Add all readers to the NCCL group.
nccl_actors.add(downstream_actor_handle)

if dag_node.type_hint is not None:
self._type_hints.append(dag_node.type_hint)

# If there were type hints indicating transport via NCCL, initialize
# the NCCL group on the participating actors.
nccl_actors = list(nccl_actors)
Expand Down
9 changes: 4 additions & 5 deletions python/ray/dashboard/modules/job/job_log_storage_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from collections import deque
from typing import Iterator, List, Tuple
from typing import AsyncIterator, List, Tuple

import ray
from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE
Expand All @@ -25,10 +25,10 @@ def get_logs(self, job_id: str) -> str:
except FileNotFoundError:
return ""

def tail_logs(self, job_id: str) -> Iterator[List[str]]:
def tail_logs(self, job_id: str) -> AsyncIterator[List[str]]:
return file_tail_iterator(self.get_log_file_path(job_id))

def get_last_n_log_lines(
async def get_last_n_log_lines(
self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR
) -> str:
"""
Expand All @@ -39,9 +39,8 @@ def get_last_n_log_lines(
job_id: The id of the job whose logs we want to return
num_log_lines: The number of lines to return.
"""
log_tail_iter = self.tail_logs(job_id)
log_tail_deque = deque(maxlen=num_log_lines)
for lines in log_tail_iter:
async for lines in self.tail_logs(job_id):
if lines is None:
break
else:
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import string
import time
import traceback
from typing import Any, Dict, Iterator, Optional, Union
from typing import Any, AsyncIterator, Dict, Optional, Union

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -619,12 +619,12 @@ def get_job_logs(self, job_id: str) -> str:
"""Get all logs produced by a job."""
return self._log_client.get_logs(job_id)

async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
"""Return an iterator following the logs of a job."""
if await self.get_job_status(job_id) is None:
raise RuntimeError(f"Job '{job_id}' does not exist.")

for lines in self._log_client.tail_logs(job_id):
async for lines in self._log_client.tail_logs(job_id):
if lines is None:
# Return if the job has exited and there are no new log lines.
status = await self.get_job_status(job_id)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/modules/job/job_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def run(
driver_exit_code=return_code,
)
else:
log_tail = self._log_client.get_last_n_log_lines(self._job_id)
log_tail = await self._log_client.get_last_n_log_lines(self._job_id)
if log_tail is not None and log_tail != "":
message = (
"Job entrypoint command "
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import logging
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, Dict, List, Optional, Union

import packaging.version

Expand Down Expand Up @@ -449,7 +449,7 @@ def get_job_logs(self, job_id: str) -> str:
self._raise_error(r)

@PublicAPI(stability="stable")
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
"""Get an iterator that follows the logs of a job.
Example:
Expand Down
87 changes: 53 additions & 34 deletions python/ray/dashboard/modules/job/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
)


# Polyfill anext() function for Python 3.9 compatibility
# May raise StopAsyncIteration.
async def anext_polyfill(iterator):
return await iterator.__anext__()


# Use the built-in anext() for Python 3.10+, otherwise use our polyfilled function
if sys.version_info < (3, 10):
anext = anext_polyfill


@pytest.fixture
def tmp():
with NamedTemporaryFile() as f:
Expand Down Expand Up @@ -80,32 +91,36 @@ async def test_forward_compatibility(self):


class TestIterLine:
def test_invalid_type(self):
@pytest.mark.asyncio
async def test_invalid_type(self):
with pytest.raises(TypeError, match="path must be a string"):
next(file_tail_iterator(1))
await anext(file_tail_iterator(1))

def test_file_not_created(self, tmp):
@pytest.mark.asyncio
async def test_file_not_created(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None
f = open(tmp, "w")
f.write("hi\n")
f.flush()
assert next(it) is not None
assert await anext(it) is not None

def test_wait_for_newline(self, tmp):
@pytest.mark.asyncio
async def test_wait_for_newline(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")
f.write("no_newline_yet")
assert next(it) is None
assert await anext(it) is None
f.write("\n")
f.flush()
assert next(it) == ["no_newline_yet\n"]
assert await anext(it) == ["no_newline_yet\n"]

def test_multiple_lines(self, tmp):
@pytest.mark.asyncio
async def test_multiple_lines(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -114,13 +129,14 @@ def test_multiple_lines(self, tmp):
s = f"{i}\n"
f.write(s)
f.flush()
assert next(it) == [s]
assert await anext(it) == [s]

assert next(it) is None
assert await anext(it) is None

def test_batching(self, tmp):
@pytest.mark.asyncio
async def test_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -131,13 +147,14 @@ def test_batching(self, tmp):
f.write(f"{i}\n")
f.flush()

assert next(it) == [f"{i}\n" for i in range(10)]
assert await anext(it) == [f"{i}\n" for i in range(10)]

assert next(it) is None
assert await anext(it) is None

def test_max_line_batching(self, tmp):
@pytest.mark.asyncio
async def test_max_line_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -148,17 +165,18 @@ def test_max_line_batching(self, tmp):
f.write(f"{i}\n")
f.flush()

assert next(it) == [f"{i}\n" for i in range(10)]
assert next(it) == [f"{i}\n" for i in range(10, 20)]
assert next(it) == [f"{i}\n" for i in range(20, 30)]
assert next(it) == [f"{i}\n" for i in range(30, 40)]
assert next(it) == [f"{i}\n" for i in range(40, 50)]
assert await anext(it) == [f"{i}\n" for i in range(10)]
assert await anext(it) == [f"{i}\n" for i in range(10, 20)]
assert await anext(it) == [f"{i}\n" for i in range(20, 30)]
assert await anext(it) == [f"{i}\n" for i in range(30, 40)]
assert await anext(it) == [f"{i}\n" for i in range(40, 50)]

assert next(it) is None
assert await anext(it) is None

def test_max_char_batching(self, tmp):
@pytest.mark.asyncio
async def test_max_char_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -170,31 +188,32 @@ def test_max_char_batching(self, tmp):
f.flush()

# First line will come in a batch of its own
assert next(it) == [f"{'1234567890' * 6000}\n"]
assert await anext(it) == [f"{'1234567890' * 6000}\n"]
# Other 4 lines will be batched together
assert (
next(it)
await anext(it)
== [
f"{'1234567890' * 500}\n",
]
* 4
)
assert next(it) is None
assert await anext(it) is None

def test_delete_file(self):
@pytest.mark.asyncio
async def test_delete_file(self):
with NamedTemporaryFile() as tmp:
it = file_tail_iterator(tmp.name)
f = open(tmp.name, "w")

assert next(it) is None
assert await anext(it) is None

f.write("hi\n")
f.flush()

assert next(it) == ["hi\n"]
assert await anext(it) == ["hi\n"]

# Calls should continue returning None after file deleted.
assert next(it) is None
assert await anext(it) is None


if __name__ == "__main__":
Expand Down
7 changes: 3 additions & 4 deletions python/ray/dashboard/modules/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import logging
import os
import re
import time
import traceback
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union

from ray._private import ray_constants
from ray._private.gcs_utils import GcsAioClient
Expand Down Expand Up @@ -60,7 +59,7 @@ def redact_url_password(url: str) -> str:
return url


def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
async def file_tail_iterator(path: str) -> AsyncIterator[Optional[List[str]]]:
"""Yield lines from a file as it's written.
Returns lines in batches of up to 10 lines or 20000 characters,
Expand Down Expand Up @@ -114,7 +113,7 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
chunk_char_count += len(curr_line)
else:
# If EOF is reached sleep for 1s before continuing
time.sleep(1)
await asyncio.sleep(1)


async def parse_and_validate_request(
Expand Down
15 changes: 10 additions & 5 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,20 @@ def _consumer_idling(self) -> bool:
return len(self._output_node.outqueue) == 0

def _report_current_usage(self) -> None:
# running_usage is the amount of resources that have been requested but
# not necessarily available
# TODO(sofian) https://github.com/ray-project/ray/issues/47520
# We need to split the reported resources into running, pending-scheduling,
# pending-node-assignment.
running_usage = self._resource_manager.get_global_running_usage()
pending_usage = self._resource_manager.get_global_pending_usage()
limits = self._resource_manager.get_global_limits()
resources_status = (
"Running. Resources: "
f"{running_usage.cpu:.4g}/{limits.cpu:.4g} CPU, "
f"{running_usage.gpu:.4g}/{limits.gpu:.4g} GPU, "
f"{running_usage.object_store_memory_str()}/"
f"{limits.object_store_memory_str()} object_store_memory "
"Active & requested resources: "
f"{running_usage.cpu:.4g} of {limits.cpu:.4g} available CPU, "
f"{running_usage.gpu:.4g} of {limits.gpu:.4g} available GPU, "
f"{running_usage.object_store_memory_str()} of "
f"{limits.object_store_memory_str()} available object_store_memory "
"(pending: "
f"{pending_usage.cpu:.4g} CPU, "
f"{pending_usage.gpu:.4g} GPU)"
Expand Down
Loading

0 comments on commit 3bcc147

Please sign in to comment.