From c7c5a5afef4fcb9b6694d921afe0cd474e2e9cc6 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 00:31:23 -0700 Subject: [PATCH 01/25] wip Signed-off-by: Nikita Vemuri --- BUILD.bazel | 18 +++ .../ray/_private/event/export_event_logger.py | 128 ++++++++++++++++++ .../modules/event/tests/test_event.py | 31 +++++ src/ray/protobuf/BUILD | 7 +- 4 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 python/ray/_private/event/export_event_logger.py diff --git a/BUILD.bazel b/BUILD.bazel index 92d44ddccbd4..735c14246262 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2500,6 +2500,16 @@ filegroup( ], ) +filegroup( + name = "export_api_py_proto", + srcs = [ + "//src/ray/protobuf:export_event_py_proto", + "//src/ray/protobuf:export_task_event_py_proto", + "//src/ray/protobuf:export_node_event_py_proto", + "//src/ray/protobuf:export_actor_event_py_proto", + ], +) + # This is a dummy test dependency that causes the python tests to be # re-run if any of these files changes. py_library( @@ -2529,6 +2539,12 @@ copy_to_workspace( dstdir = "python/ray/serve/generated", ) +copy_to_workspace( + name = "cp_export_api_py_proto", + srcs = [":export_api_py_proto"], + dstdir = "python/ray/core/generated/export_api", +) + copy_to_workspace( name = "cp_redis", srcs = [ @@ -2561,6 +2577,7 @@ genrule( srcs = [ ":cp_all_py_proto", ":cp_serve_py_proto", + ":cp_export_api_py_proto", ], outs = ["install_py_proto.out"], cmd = """ @@ -2569,6 +2586,7 @@ genrule( # shellcheck disable=SC2006 files=( `ls python/ray/core/generated/*_pb2*.py` \ + `ls python/ray/core/generated/export_api/*_pb2*.py` \ `ls python/ray/serve/generated/*_pb2*.py` \ ) sed -i -E 's/from src.ray.protobuf/from ./' "$${files[@]}" diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py new file mode 100644 index 000000000000..b56a289f2530 --- /dev/null +++ b/python/ray/_private/event/export_event_logger.py @@ -0,0 +1,128 @@ +import logging +import pathlib +import json +import random +import string +import socket +import os +import threading + +from typing import Dict, Optional, Union +from datetime import datetime + +from google.protobuf.json_format import Parse + +import ray.core.generated.export_api as export_api_protos +from ray.core.generated.export_api.export_event_pb2 import ExportEvent +from ray._private.protobuf_compat import message_to_dict + +global_logger = logging.getLogger(__name__) + +# TODO: update to only contain types that write python events +ExportEventDataType = Union[ + export_api_protos.export_task_event_pb2.ExportTaskEventData, + export_api_protos.export_node_data_pb2.ExportNodeData, + export_api_protos.export_actor_data_pb2.ExportActorData, +] + +def get_event_id(): + return "".join([random.choice(string.hexdigits) for _ in range(18)]) + +class ExportEventLoggerAdapter: + def __init__(self, source: ExportEvent.SourceType, logger: logging.Logger): + """Adapter for the Python logger that's used to emit export events. + """ + self.logger = logger + self.source = source + + def send_event(self, event_data: ExportEventDataType): + # NOTE: Python logger is thread-safe, + # so we don't need to protect it using locks. + try: + event = self._create_export_event(event_data) + except TypeError: + # TODO: log that invalid event_data type passed + return + + event_as_str = self._export_event_to_string(event) + + self.logger.info(event_as_str) + # Force flush so that we won't lose events + self.logger.handlers[0].flush() + + def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent: + event = ExportEvent() + event.event_id = get_event_id() + event.timestamp = int(datetime.now().timestamp()) + if type(event_data) is export_api_protos.export_task_event_pb2.ExportTaskEventData: + event.task_event_data.CopyFrom(event_data) + event.source_type = ExportEvent.SourceType.EXPORT_TASK + else: + raise TypeError("Invalid event_data type") + if event.source_type != self.source: + # TODO: log that source type mismatch + pass + return event + + def _export_event_to_string(self, event: ExportEvent) -> str: + event_data_json = {} + proto_to_dict_options = { + "always_print_fields_with_no_presence":True, + "preserving_proto_field_name":True, + "use_integers_for_enums":False + } + event_data_field_set = event.WhichOneof('event_data') + if event_data_field_set: + event_data_json = message_to_dict( + getattr(event, event_data_field_set), + **proto_to_dict_options, + ) + else: + # TODO: log that no event data passed + pass + event_json = { + "event_id": event.event_id, + "timestamp": event.timestamp, + "source_type": ExportEvent.SourceType.Name(event.source_type), + "event_data": event_data_json + } + return json.dumps(event_json) + +def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: str): + logger = logging.getLogger("_ray_export_event_logger") + logger.setLevel(logging.INFO) + dir_path = pathlib.Path(sink_dir) / "events" + filepath = dir_path / f"event_{source}.log" + dir_path.mkdir(exist_ok=True) + filepath.touch(exist_ok=True) + # Configure the logger. + # TODO: make this RotatingFileHandler + handler = logging.FileHandler(filepath) + # formatter = logging.Formatter("%(message)s") + # handler.setFormatter(formatter) + logger.addHandler(handler) + logger.propagate = False + return logger + +# This lock must be used when accessing or updating global event logger dict. +_export_event_logger_lock = threading.Lock() +_export_event_logger = {} + + +def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str): + """Get the export event logger of the current process. + + There's only one logger per export event source. + + Args: + source: The source of the export event. + sink_dir: The directory to sink event logs. + """ + with _export_event_logger_lock: + global _export_event_logger + source_name = ExportEvent.SourceType.Name(source) + if source_name not in _export_event_logger: + logger = _build_export_event_file_logger(source_name, sink_dir) + _export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger) + + return _export_event_logger[source_name] \ No newline at end of file diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index c7dbb18aafc4..7cbbddae5235 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -17,6 +17,8 @@ import ray from ray._private.event.event_logger import filter_event_by_level, get_event_logger +from ray._private.event.export_event_logger import get_export_event_logger +from ray._private.protobuf_compat import message_to_dict from ray._private.test_utils import ( format_web_url, wait_for_condition, @@ -25,6 +27,7 @@ from ray._private.utils import binary_to_hex from ray.cluster_utils import AutoscalingCluster from ray.core.generated import event_pb2 +from ray.core.generated.export_api import export_event_pb2, export_task_event_pb2 from ray.dashboard.modules.event import event_consts from ray.dashboard.modules.event.event_utils import monitor_events from ray.dashboard.tests.conftest import * # noqa @@ -545,5 +548,33 @@ def verify(): pprint(list_cluster_events()) +def test_export_event_logger(tmp_path): + logger = get_export_event_logger(export_event_pb2.ExportEvent.SourceType.EXPORT_TASK, str(tmp_path)) + event_data = export_task_event_pb2.ExportTaskEventData( + task_id=b"task_id0", + attempt_number=1, + job_id=b"job_id0", + ) + logger.send_event(event_data) + + event_dir = tmp_path / "events" + assert event_dir.exists() + event_file = event_dir / "event_EXPORT_TASK.log" + assert event_file.exists() + + with event_file.open() as f: + lines = f.readlines() + assert len(lines) == 1 + + line = lines[0] + data = json.loads(line) + assert data["source_type"] == "EXPORT_TASK" + assert data["event_data"] == message_to_dict( + event_data, + always_print_fields_with_no_presence=True, + preserving_proto_field_name=True, + use_integers_for_enums=False, + ) + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index 26bace662c17..f575173f4cb1 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -245,10 +245,13 @@ python_grpc_compile( deps = [":event_proto"], ) +# Note: Any dependencies added to this rule should also be included +# in the :export_api_py_proto filegroup so the python files are +# correctly generated. proto_library( name = "export_event_proto", srcs = ["export_api/export_event.proto"], - deps = [":export_task_event_proto"], + deps = [":export_task_event_proto", ":export_node_event_proto", ":export_actor_event_proto"], ) cc_proto_library( @@ -264,7 +267,7 @@ python_grpc_compile( proto_library( name = "export_task_event_proto", srcs = ["export_api/export_task_event.proto"], - deps = [":common_proto", ":export_runtime_env_proto", ":export_node_event_proto", ":export_actor_event_proto"], + deps = [":common_proto", ":export_runtime_env_proto"], ) cc_proto_library( From e1638022e0322fbbc4a8c795e996446a12cf850d Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 09:45:26 -0700 Subject: [PATCH 02/25] add python logger Signed-off-by: Nikita Vemuri --- .../ray/_private/event/export_event_logger.py | 67 +++++++++++-------- .../modules/event/tests/test_event.py | 15 +++-- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index b56a289f2530..405cfc8bc11f 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -3,15 +3,11 @@ import json import random import string -import socket -import os import threading -from typing import Dict, Optional, Union +from typing import Union from datetime import datetime -from google.protobuf.json_format import Parse - import ray.core.generated.export_api as export_api_protos from ray.core.generated.export_api.export_event_pb2 import ExportEvent from ray._private.protobuf_compat import message_to_dict @@ -25,23 +21,27 @@ export_api_protos.export_actor_data_pb2.ExportActorData, ] + def get_event_id(): return "".join([random.choice(string.hexdigits) for _ in range(18)]) + class ExportEventLoggerAdapter: def __init__(self, source: ExportEvent.SourceType, logger: logging.Logger): - """Adapter for the Python logger that's used to emit export events. - """ + """Adapter for the Python logger that's used to emit export events.""" self.logger = logger self.source = source - + def send_event(self, event_data: ExportEventDataType): # NOTE: Python logger is thread-safe, # so we don't need to protect it using locks. try: event = self._create_export_event(event_data) except TypeError: - # TODO: log that invalid event_data type passed + global_logger.exception( + "Failed to create ExportEvent from event_data so no " + "event will be written to file." + ) return event_as_str = self._export_event_to_string(event) @@ -49,45 +49,58 @@ def send_event(self, event_data: ExportEventDataType): self.logger.info(event_as_str) # Force flush so that we won't lose events self.logger.handlers[0].flush() - + def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent: event = ExportEvent() event.event_id = get_event_id() event.timestamp = int(datetime.now().timestamp()) - if type(event_data) is export_api_protos.export_task_event_pb2.ExportTaskEventData: + if ( + type(event_data) + is export_api_protos.export_task_event_pb2.ExportTaskEventData + ): event.task_event_data.CopyFrom(event_data) event.source_type = ExportEvent.SourceType.EXPORT_TASK else: - raise TypeError("Invalid event_data type") + raise TypeError(f"Invalid event_data type: {type(event_data)}") if event.source_type != self.source: - # TODO: log that source type mismatch + global_logger.error( + f"event_data has source type {event.source_type}, however " + f"the event was sent to a logger with source {self.source}. " + f"The event will still be written to the file of {self.source} " + "but this indicates a bug in the code." + ) pass return event - + def _export_event_to_string(self, event: ExportEvent) -> str: event_data_json = {} proto_to_dict_options = { - "always_print_fields_with_no_presence":True, - "preserving_proto_field_name":True, - "use_integers_for_enums":False + "always_print_fields_with_no_presence": True, + "preserving_proto_field_name": True, + "use_integers_for_enums": False, } - event_data_field_set = event.WhichOneof('event_data') + event_data_field_set = event.WhichOneof("event_data") if event_data_field_set: event_data_json = message_to_dict( getattr(event, event_data_field_set), **proto_to_dict_options, ) else: - # TODO: log that no event data passed + global_logger.error( + f"event_data missing from export event with id {event.event_id} " + f"and type {event.source_type}. An empty event will be written, " + "but this indicates a bug in the code. " + ) pass event_json = { - "event_id": event.event_id, - "timestamp": event.timestamp, + "event_id": event.event_id, + "timestamp": event.timestamp, "source_type": ExportEvent.SourceType.Name(event.source_type), - "event_data": event_data_json + "event_data": event_data_json, } return json.dumps(event_json) + def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: str): logger = logging.getLogger("_ray_export_event_logger") logger.setLevel(logging.INFO) @@ -96,14 +109,14 @@ def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: st dir_path.mkdir(exist_ok=True) filepath.touch(exist_ok=True) # Configure the logger. - # TODO: make this RotatingFileHandler - handler = logging.FileHandler(filepath) - # formatter = logging.Formatter("%(message)s") - # handler.setFormatter(formatter) + handler = logging.handlers.RotatingFileHandler( + filepath, maxBytes=(100 * 1e6), backupCount=20 # 100 MB max file size + ) logger.addHandler(handler) logger.propagate = False return logger + # This lock must be used when accessing or updating global event logger dict. _export_event_logger_lock = threading.Lock() _export_event_logger = {} @@ -125,4 +138,4 @@ def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str): logger = _build_export_event_file_logger(source_name, sink_dir) _export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger) - return _export_event_logger[source_name] \ No newline at end of file + return _export_event_logger[source_name] diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index 7cbbddae5235..807412502ddd 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -549,7 +549,9 @@ def verify(): def test_export_event_logger(tmp_path): - logger = get_export_event_logger(export_event_pb2.ExportEvent.SourceType.EXPORT_TASK, str(tmp_path)) + logger = get_export_event_logger( + export_event_pb2.ExportEvent.SourceType.EXPORT_TASK, str(tmp_path) + ) event_data = export_task_event_pb2.ExportTaskEventData( task_id=b"task_id0", attempt_number=1, @@ -570,11 +572,12 @@ def test_export_event_logger(tmp_path): data = json.loads(line) assert data["source_type"] == "EXPORT_TASK" assert data["event_data"] == message_to_dict( - event_data, - always_print_fields_with_no_presence=True, - preserving_proto_field_name=True, - use_integers_for_enums=False, - ) + event_data, + always_print_fields_with_no_presence=True, + preserving_proto_field_name=True, + use_integers_for_enums=False, + ) + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) From 36328079b3479d9d1e7551bff26d4de561b9d85a Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 11:40:58 -0700 Subject: [PATCH 03/25] add schema Signed-off-by: Nikita Vemuri --- BUILD.bazel | 1 + .../ray/_private/event/export_event_logger.py | 11 ++-- .../modules/event/tests/test_event.py | 17 ++--- python/ray/dashboard/modules/job/common.py | 26 +++++++- src/ray/protobuf/BUILD | 22 ++++++- .../protobuf/export_api/export_event.proto | 3 + .../export_submission_job_event.proto | 65 +++++++++++++++++++ 7 files changed, 128 insertions(+), 17 deletions(-) create mode 100644 src/ray/protobuf/export_api/export_submission_job_event.proto diff --git a/BUILD.bazel b/BUILD.bazel index 735c14246262..a4cc859d5ca9 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2507,6 +2507,7 @@ filegroup( "//src/ray/protobuf:export_task_event_py_proto", "//src/ray/protobuf:export_node_event_py_proto", "//src/ray/protobuf:export_actor_event_py_proto", + "//src/ray/protobuf:export_submission_job_event_py_proto", ], ) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 405cfc8bc11f..7c112e982298 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -8,18 +8,15 @@ from typing import Union from datetime import datetime -import ray.core.generated.export_api as export_api_protos from ray.core.generated.export_api.export_event_pb2 import ExportEvent +from ray.core.generated.export_api.export_submission_job_event_pb2 import ExportSubmissionJobEventData from ray._private.protobuf_compat import message_to_dict global_logger = logging.getLogger(__name__) -# TODO: update to only contain types that write python events -ExportEventDataType = Union[ - export_api_protos.export_task_event_pb2.ExportTaskEventData, - export_api_protos.export_node_data_pb2.ExportNodeData, - export_api_protos.export_actor_data_pb2.ExportActorData, -] +# This contains the union of export event data types which emit events +# using the python ExportEventLoggerAdapter +ExportEventDataType = Union[ExportSubmissionJobEventData] def get_event_id(): diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index 807412502ddd..e04850c71d97 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -27,7 +27,7 @@ from ray._private.utils import binary_to_hex from ray.cluster_utils import AutoscalingCluster from ray.core.generated import event_pb2 -from ray.core.generated.export_api import export_event_pb2, export_task_event_pb2 +from ray.core.generated.export_api import export_event_pb2, export_submission_job_event_pb2 from ray.dashboard.modules.event import event_consts from ray.dashboard.modules.event.event_utils import monitor_events from ray.dashboard.tests.conftest import * # noqa @@ -550,18 +550,19 @@ def verify(): def test_export_event_logger(tmp_path): logger = get_export_event_logger( - export_event_pb2.ExportEvent.SourceType.EXPORT_TASK, str(tmp_path) + export_event_pb2.ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, str(tmp_path) ) - event_data = export_task_event_pb2.ExportTaskEventData( - task_id=b"task_id0", - attempt_number=1, - job_id=b"job_id0", + event_data = export_submission_job_event_pb2.ExportSubmissionJobEventData( + submission_job_id="submission_job_id0", + status=export_submission_job_event_pb2.ExportSubmissionJobEventData.JobStatus.RUNNING, + entrypoint="ls", + metadata={}, ) logger.send_event(event_data) event_dir = tmp_path / "events" assert event_dir.exists() - event_file = event_dir / "event_EXPORT_TASK.log" + event_file = event_dir / "event_EXPORT_SUBMISSION_JOB.log" assert event_file.exists() with event_file.open() as f: @@ -570,7 +571,7 @@ def test_export_event_logger(tmp_path): line = lines[0] data = json.loads(line) - assert data["source_type"] == "EXPORT_TASK" + assert data["source_type"] == "EXPORT_SUBMISSION_JOB" assert data["event_data"] == message_to_dict( event_data, always_print_fields_with_no_presence=True, diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 06613ec27e42..c2f95ea0de3d 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -7,8 +7,10 @@ from typing import Any, Dict, Optional, Tuple, Union from ray._private import ray_constants +from ray._private.event.export_event_logger import ExportEventLoggerAdapter from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.packaging import parse_uri +from ray.core.generated.export_api.export_submission_job_event_pb2 import ExportSubmissionJobEventData from ray.util.annotations import PublicAPI # NOTE(edoakes): these constants should be considered a public API because @@ -189,8 +191,9 @@ class JobInfoStorageClient: JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" - def __init__(self, gcs_aio_client: GcsAioClient): + def __init__(self, gcs_aio_client: GcsAioClient, export_submission_job_event_logger: Optional[ExportEventLoggerAdapter] = None): self._gcs_aio_client = gcs_aio_client + self._export_submission_job_event_logger = export_event_logger async def put_info( self, job_id: str, job_info: JobInfo, overwrite: bool = True @@ -211,7 +214,28 @@ async def put_info( overwrite, namespace=ray_constants.KV_NAMESPACE_JOB, ) + self._write_submission_job_export_event(job_id, job_info) return added_num == 1 + + def _write_submission_job_export_event(self, job_id: str, job_info: JobInfo) -> None: + submision_event_data = ExportSubmissionJobEventData( + submission_job_id=job_id, + status=ExportSubmissionJobEventData.JobStatus.Name(job_info.status), + entrypoint=job_info.entrypoint, + message=job_info.message, + error_type=job_info.error_type, + start_time=job_info.start_time, + end_time=job_info.end_time, + runtime_env_json=json.dumps(job_info.runtime_env), + driver_agent_http_address=job_info.driver_agent_http_address, + driver_node_id=job_info.driver_node_id, + driver_exit_code=job_info.driver_exit_code, + ) + submission_event_data.metadata.CopyFrom(job_info.metadata) + + if self._export_submission_job_event_logger: + self._export_submission_job_event_logger.send_event(submission_event_data) + async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]: serialized_info = await self._gcs_aio_client.internal_kv_get( diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index f575173f4cb1..bacf1fbe0481 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -251,7 +251,12 @@ python_grpc_compile( proto_library( name = "export_event_proto", srcs = ["export_api/export_event.proto"], - deps = [":export_task_event_proto", ":export_node_event_proto", ":export_actor_event_proto"], + deps = [ + ":export_task_event_proto", + ":export_node_event_proto", + ":export_actor_event_proto", + ":export_submission_job_event_proto" + ], ) cc_proto_library( @@ -326,6 +331,21 @@ python_grpc_compile( deps = [":export_actor_event_proto"], ) +proto_library( + name = "export_submission_job_event_proto", + srcs = ["export_api/export_submission_job_event.proto"], +) + +cc_proto_library( + name = "export_submission_job_event_cc_proto", + deps = [":export_submission_job_event_proto"], +) + +python_grpc_compile( + name = "export_submission_job_event_py_proto", + deps = [":export_submission_job_event_proto"], +) + # Job agent. proto_library( name = "job_agent_proto", diff --git a/src/ray/protobuf/export_api/export_event.proto b/src/ray/protobuf/export_api/export_event.proto index e2491af123d3..835e23b7446f 100644 --- a/src/ray/protobuf/export_api/export_event.proto +++ b/src/ray/protobuf/export_api/export_event.proto @@ -20,6 +20,7 @@ package ray.rpc; import "src/ray/protobuf/export_api/export_task_event.proto"; import "src/ray/protobuf/export_api/export_node_data.proto"; import "src/ray/protobuf/export_api/export_actor_data.proto"; +import "src/ray/protobuf/export_api/export_submission_job_event.proto"; // ExportEvent defines events stored by the export API. This // schema is public and any changes must be backward compatible. @@ -28,6 +29,7 @@ message ExportEvent { EXPORT_TASK = 0; EXPORT_NODE = 1; EXPORT_ACTOR = 2; + EXPORT_SUBMISSION_JOB = 4; } // event_id is the unique ID of this event @@ -42,5 +44,6 @@ message ExportEvent { ExportTaskEventData task_event_data = 4; ExportNodeData node_event_data = 5; ExportActorData actor_event_data = 6; + ExportSubmissionJobEventData submission_job_event_data = 8; } } diff --git a/src/ray/protobuf/export_api/export_submission_job_event.proto b/src/ray/protobuf/export_api/export_submission_job_event.proto new file mode 100644 index 000000000000..611b4e3cce0f --- /dev/null +++ b/src/ray/protobuf/export_api/export_submission_job_event.proto @@ -0,0 +1,65 @@ +// Copyright 2024 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +option cc_enable_arenas = true; + +package ray.rpc; + +// ExportSubmissionJobEventData defines the event_data stored by the export API +// for EXPORT_SUBMISSION_JOB type events. This schema is public and any changes must +// be backward compatible. +message ExportSubmissionJobEventData { + // Status of a job. This should be kept in sync with + // python/ray/dashboard/modules/job/common.JobStatus + enum JobStatus { + // The job has not started yet, likely waiting for the runtime_env to be set up. + PENDING = 0; + // The job is currently running. + RUNNING = 1; + // The job was intentionally stopped by the user. + STOPPED = 2; + // The job finished successfully. + SUCCEEDED = 3; + // The job failed. + FAILED = 4; + } + + // Submission ID of the job. + string submission_job_id = 1; + // The status of the job. + JobStatus status = 2; + // The entrypoint command for this job. + string entrypoint = 3; + // A message describing the status in more detail. + optional string message = 4; + // Error type (e.g. Runtime env setup failure, Internal error, user script error) + optional string error_type = 5; + // The time when the job was started. A Unix timestamp in ms. + optional uint64 start_time = 6; + // The time when the job moved into a terminal state. A Unix timestamp in ms. + optional uint64 end_time = 7; + // Arbitrary user-provided metadata for the job. + map metadata = 8; + // The JSON-serialized runtime environment for the job. + optional string runtime_env_json = 9; + // Driver agent http address + optional string driver_agent_http_address = 10; + // The node id that driver running on. It will be None only when the job status + // is PENDING, and this field will not be deleted or modified even if the driver dies + optional string driver_node_id = 11; + // The driver process exit code after the driver executed. Return None if driver + // doesn't finish executing + optional int32 driver_exit_code = 12; +} \ No newline at end of file From 098b07f05f95c4d9242a5e6b0c7a9c95b413b8a2 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 12:24:02 -0700 Subject: [PATCH 04/25] wip Signed-off-by: Nikita Vemuri --- .../ray/_private/event/export_event_logger.py | 13 ++++++------ .../modules/event/tests/test_event.py | 12 ++++++++--- python/ray/dashboard/modules/job/common.py | 21 ++++++++++++------- .../ray/dashboard/modules/job/job_manager.py | 13 +++++++++++- 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 7c112e982298..62d3861ba067 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -9,7 +9,9 @@ from datetime import datetime from ray.core.generated.export_api.export_event_pb2 import ExportEvent -from ray.core.generated.export_api.export_submission_job_event_pb2 import ExportSubmissionJobEventData +from ray.core.generated.export_api.export_submission_job_event_pb2 import ( + ExportSubmissionJobEventData, +) from ray._private.protobuf_compat import message_to_dict global_logger = logging.getLogger(__name__) @@ -51,12 +53,9 @@ def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent: event = ExportEvent() event.event_id = get_event_id() event.timestamp = int(datetime.now().timestamp()) - if ( - type(event_data) - is export_api_protos.export_task_event_pb2.ExportTaskEventData - ): - event.task_event_data.CopyFrom(event_data) - event.source_type = ExportEvent.SourceType.EXPORT_TASK + if type(event_data) is ExportSubmissionJobEventData: + event.submission_job_event_data.CopyFrom(event_data) + event.source_type = ExportEvent.SourceType.EXPORT_SUBMISSION_JOB else: raise TypeError(f"Invalid event_data type: {type(event_data)}") if event.source_type != self.source: diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index e04850c71d97..ffdcb4cf6381 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -27,7 +27,10 @@ from ray._private.utils import binary_to_hex from ray.cluster_utils import AutoscalingCluster from ray.core.generated import event_pb2 -from ray.core.generated.export_api import export_event_pb2, export_submission_job_event_pb2 +from ray.core.generated.export_api import ( + export_event_pb2, + export_submission_job_event_pb2, +) from ray.dashboard.modules.event import event_consts from ray.dashboard.modules.event.event_utils import monitor_events from ray.dashboard.tests.conftest import * # noqa @@ -552,9 +555,12 @@ def test_export_event_logger(tmp_path): logger = get_export_event_logger( export_event_pb2.ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, str(tmp_path) ) - event_data = export_submission_job_event_pb2.ExportSubmissionJobEventData( + ExportSubmissionJobEventData = ( + export_submission_job_event_pb2.ExportSubmissionJobEventData + ) + event_data = ExportSubmissionJobEventData( submission_job_id="submission_job_id0", - status=export_submission_job_event_pb2.ExportSubmissionJobEventData.JobStatus.RUNNING, + status=ExportSubmissionJobEventData.JobStatus.RUNNING, entrypoint="ls", metadata={}, ) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index c2f95ea0de3d..14f36a22eada 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -10,7 +10,9 @@ from ray._private.event.export_event_logger import ExportEventLoggerAdapter from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.packaging import parse_uri -from ray.core.generated.export_api.export_submission_job_event_pb2 import ExportSubmissionJobEventData +from ray.core.generated.export_api.export_submission_job_event_pb2 import ( + ExportSubmissionJobEventData, +) from ray.util.annotations import PublicAPI # NOTE(edoakes): these constants should be considered a public API because @@ -191,9 +193,13 @@ class JobInfoStorageClient: JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" - def __init__(self, gcs_aio_client: GcsAioClient, export_submission_job_event_logger: Optional[ExportEventLoggerAdapter] = None): + def __init__( + self, + gcs_aio_client: GcsAioClient, + export_submission_job_event_logger: Optional[ExportEventLoggerAdapter] = None, + ): self._gcs_aio_client = gcs_aio_client - self._export_submission_job_event_logger = export_event_logger + self._export_submission_job_event_logger = export_submission_job_event_logger async def put_info( self, job_id: str, job_info: JobInfo, overwrite: bool = True @@ -216,9 +222,11 @@ async def put_info( ) self._write_submission_job_export_event(job_id, job_info) return added_num == 1 - - def _write_submission_job_export_event(self, job_id: str, job_info: JobInfo) -> None: - submision_event_data = ExportSubmissionJobEventData( + + def _write_submission_job_export_event( + self, job_id: str, job_info: JobInfo + ) -> None: + submission_event_data = ExportSubmissionJobEventData( submission_job_id=job_id, status=ExportSubmissionJobEventData.JobStatus.Name(job_info.status), entrypoint=job_info.entrypoint, @@ -236,7 +244,6 @@ def _write_submission_job_export_event(self, job_id: str, job_info: JobInfo) -> if self._export_submission_job_event_logger: self._export_submission_job_event_logger.send_event(submission_event_data) - async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]: serialized_info = await self._gcs_aio_client.internal_kv_get( self.JOB_DATA_KEY.format(job_id=job_id).encode(), diff --git a/python/ray/dashboard/modules/job/job_manager.py b/python/ray/dashboard/modules/job/job_manager.py index a4bbeecd2868..2a24350296bd 100644 --- a/python/ray/dashboard/modules/job/job_manager.py +++ b/python/ray/dashboard/modules/job/job_manager.py @@ -11,10 +11,12 @@ import ray import ray._private.ray_constants as ray_constants from ray._private.event.event_logger import get_event_logger +from ray._private.event.export_event_logger import get_export_event_logger from ray._private.gcs_utils import GcsAioClient from ray._private.utils import run_background_task from ray.actor import ActorHandle from ray.core.generated.event_pb2 import Event +from ray.core.generated.export_api.export_event_pb2 import ExportEvent from ray.dashboard.consts import ( DEFAULT_JOB_START_TIMEOUT_SECONDS, RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, @@ -70,7 +72,16 @@ class JobManager: def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_aio_client = gcs_aio_client - self._job_info_client = JobInfoStorageClient(gcs_aio_client) + try: + # TODO: Add FF + export_submission_job_event_logger = get_export_event_logger( + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, logs_dir + ) + except Exception: + export_submission_job_event_logger = None + self._job_info_client = JobInfoStorageClient( + gcs_aio_client, export_submission_job_event_logger + ) self._gcs_address = gcs_aio_client.address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) From 24fa611da7364a753290cff8d44d7ad26b93d9da Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 14:26:49 -0700 Subject: [PATCH 05/25] wip Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/job/common.py | 39 ++++++++++++++++--- .../ray/dashboard/modules/job/job_manager.py | 13 +------ 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 14f36a22eada..8f04143e65d8 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -1,15 +1,18 @@ import asyncio import json +import logging import time from dataclasses import asdict, dataclass, replace from enum import Enum from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union +import ray from ray._private import ray_constants -from ray._private.event.export_event_logger import ExportEventLoggerAdapter +from ray._private.event.export_event_logger import get_export_event_logger from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.packaging import parse_uri +from ray.core.generated.export_api.export_event_pb2 import ExportEvent from ray.core.generated.export_api.export_submission_job_event_pb2 import ( ExportSubmissionJobEventData, ) @@ -27,6 +30,8 @@ SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE" JOB_LOGS_PATH_TEMPLATE = "job-driver-{submission_id}.log" +logger = logging.getLogger(__name__) + @PublicAPI(stability="stable") class JobStatus(str, Enum): @@ -196,10 +201,20 @@ class JobInfoStorageClient: def __init__( self, gcs_aio_client: GcsAioClient, - export_submission_job_event_logger: Optional[ExportEventLoggerAdapter] = None, ): self._gcs_aio_client = gcs_aio_client - self._export_submission_job_event_logger = export_submission_job_event_logger + try: + # TODO: Add FF + log_dir = ray._private.worker._global_node.get_logs_dir_path() + self._export_submission_job_event_logger = get_export_event_logger( + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir + ) + except Exception: + logger.exception( + "Unable to initialize export event logger so no export " + "events will be written." + ) + self._export_submission_job_event_logger = None async def put_info( self, job_id: str, job_info: JobInfo, overwrite: bool = True @@ -226,11 +241,26 @@ async def put_info( def _write_submission_job_export_event( self, job_id: str, job_info: JobInfo ) -> None: + job_status = None + status_value_descriptor = ( + ExportSubmissionJobEventData.JobStatus.DESCRIPTOR.values_by_name.get( + job_info.status.name + ) + ) + if status_value_descriptor is None: + logger.error( + f"{job_info.status.name} is not a valid " + "ExportSubmissionJobEventData.JobStatus enum value. This event " + "will not be written." + ) + return + job_status = status_value_descriptor.number submission_event_data = ExportSubmissionJobEventData( submission_job_id=job_id, - status=ExportSubmissionJobEventData.JobStatus.Name(job_info.status), + status=job_status, entrypoint=job_info.entrypoint, message=job_info.message, + metadata=job_info.metadata, error_type=job_info.error_type, start_time=job_info.start_time, end_time=job_info.end_time, @@ -239,7 +269,6 @@ def _write_submission_job_export_event( driver_node_id=job_info.driver_node_id, driver_exit_code=job_info.driver_exit_code, ) - submission_event_data.metadata.CopyFrom(job_info.metadata) if self._export_submission_job_event_logger: self._export_submission_job_event_logger.send_event(submission_event_data) diff --git a/python/ray/dashboard/modules/job/job_manager.py b/python/ray/dashboard/modules/job/job_manager.py index 2a24350296bd..a4bbeecd2868 100644 --- a/python/ray/dashboard/modules/job/job_manager.py +++ b/python/ray/dashboard/modules/job/job_manager.py @@ -11,12 +11,10 @@ import ray import ray._private.ray_constants as ray_constants from ray._private.event.event_logger import get_event_logger -from ray._private.event.export_event_logger import get_export_event_logger from ray._private.gcs_utils import GcsAioClient from ray._private.utils import run_background_task from ray.actor import ActorHandle from ray.core.generated.event_pb2 import Event -from ray.core.generated.export_api.export_event_pb2 import ExportEvent from ray.dashboard.consts import ( DEFAULT_JOB_START_TIMEOUT_SECONDS, RAY_JOB_ALLOW_DRIVER_ON_WORKER_NODES_ENV_VAR, @@ -72,16 +70,7 @@ class JobManager: def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_aio_client = gcs_aio_client - try: - # TODO: Add FF - export_submission_job_event_logger = get_export_event_logger( - ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, logs_dir - ) - except Exception: - export_submission_job_event_logger = None - self._job_info_client = JobInfoStorageClient( - gcs_aio_client, export_submission_job_event_logger - ) + self._job_info_client = JobInfoStorageClient(gcs_aio_client) self._gcs_address = gcs_aio_client.address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) From bdcfff8f8716fa70f2d53285591c2c8e08f26b46 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 14:51:55 -0700 Subject: [PATCH 06/25] add test Signed-off-by: Nikita Vemuri --- .../modules/job/tests/test_job_manager.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/python/ray/dashboard/modules/job/tests/test_job_manager.py b/python/ray/dashboard/modules/job/tests/test_job_manager.py index e2e5bc2e5e5c..39116f137fff 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_manager.py +++ b/python/ray/dashboard/modules/job/tests/test_job_manager.py @@ -1,4 +1,5 @@ import asyncio +import json import os import signal import sys @@ -180,6 +181,49 @@ async def test_get_all_job_info(call_ray_start, tmp_path): # noqa: F811 assert found +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + ["ray start --head"], + indirect=True, +) +async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 + """Submission job export events are correctly written""" + address_info = ray.init(address=call_ray_start) + gcs_aio_client = GcsAioClient( + address=address_info["gcs_address"], nums_reconnect_retry=0 + ) + log_dir = ray._private.worker._global_node.get_logs_dir_path() + job_manager = JobManager(gcs_aio_client, log_dir) + + # Submit a job. + submission_id = await job_manager.submit_job( + entrypoint="python -c 'import ray; ray.init()'", + ) + + # Wait for the job to be finished. + await async_wait_for_condition_async_predicate( + check_job_succeeded, job_manager=job_manager, job_id=submission_id + ) + + # Verify export events are written + event_dir = f"{log_dir}/events" + assert os.path.isdir(event_dir) + event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" + assert os.path.isfile(event_file) + + with open(event_file, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"] + + for line, expected_status in zip(lines, expected_status_values): + data = json.loads(line) + assert data["source_type"] == "EXPORT_SUBMISSION_JOB" + assert data["event_data"]["submission_job_id"] == submission_id + assert data["event_data"]["status"] == expected_status + + @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", From d19b595df134a22911865833e8dbdc22d9a5badc Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Tue, 3 Sep 2024 21:10:14 -0700 Subject: [PATCH 07/25] add test Signed-off-by: Nikita Vemuri --- python/ray/_private/ray_constants.py | 2 + python/ray/dashboard/modules/job/common.py | 14 ++- .../modules/job/tests/test_job_manager.py | 95 ++++++++++--------- 3 files changed, 63 insertions(+), 48 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 81e32029b6a1..f97a1918c89b 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -501,3 +501,5 @@ def gcs_actor_scheduling_enabled(): PLACEMENT_GROUP_BUNDLE_RESOURCE_NAME = "bundle" RAY_LOGGING_CONFIG_ENCODING = os.environ.get("RAY_LOGGING_CONFIG_ENCODING") + +RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 8f04143e65d8..7a8130d773eb 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -204,10 +204,13 @@ def __init__( ): self._gcs_aio_client = gcs_aio_client try: - # TODO: Add FF log_dir = ray._private.worker._global_node.get_logs_dir_path() - self._export_submission_job_event_logger = get_export_event_logger( - ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir + self._export_submission_job_event_logger = ( + get_export_event_logger( + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir + ) + if ray_constants.RAY_ENABLE_EXPORT_API_WRITE + else None ) except Exception: logger.exception( @@ -235,13 +238,14 @@ async def put_info( overwrite, namespace=ray_constants.KV_NAMESPACE_JOB, ) - self._write_submission_job_export_event(job_id, job_info) + if added_num == 1 or overwrite: + # Write export event if data was updated in the KV store + self._write_submission_job_export_event(job_id, job_info) return added_num == 1 def _write_submission_job_export_event( self, job_id: str, job_info: JobInfo ) -> None: - job_status = None status_value_descriptor = ( ExportSubmissionJobEventData.JobStatus.DESCRIPTOR.values_by_name.get( job_info.status.name diff --git a/python/ray/dashboard/modules/job/tests/test_job_manager.py b/python/ray/dashboard/modules/job/tests/test_job_manager.py index 39116f137fff..54576c0b2a36 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_manager.py +++ b/python/ray/dashboard/modules/job/tests/test_job_manager.py @@ -11,6 +11,7 @@ import pytest import ray +from ray._private import ray_constants from ray._private.gcs_utils import GcsAioClient from ray._private.ray_constants import ( DEFAULT_DASHBOARD_AGENT_LISTEN_PORT, @@ -47,6 +48,57 @@ import psutil +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + [ + { + "env": { + "RAY_enable_export_api_write": "true", + }, + "cmd": "ray start --head", + } + ], + indirect=True, +) +async def test_submission_job_export_events(call_ray_start): # noqa: F811 + """Submission job export events are correctly written""" + ray_constants.RAY_ENABLE_EXPORT_API_WRITE = True + address_info = ray.init(address=call_ray_start) + gcs_aio_client = GcsAioClient( + address=address_info["gcs_address"], nums_reconnect_retry=0 + ) + log_dir = ray._private.worker._global_node.get_logs_dir_path() + job_manager = JobManager(gcs_aio_client, log_dir) + + # Submit a job. + submission_id = await job_manager.submit_job( + entrypoint="python -c 'import ray; ray.init()'", + ) + + # Wait for the job to be finished. + await async_wait_for_condition_async_predicate( + check_job_succeeded, job_manager=job_manager, job_id=submission_id + ) + + # Verify export events are written + event_dir = f"{log_dir}/events" + assert os.path.isdir(event_dir) + event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" + assert os.path.isfile(event_file) + + with open(event_file, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"] + + for line, expected_status in zip(lines, expected_status_values): + data = json.loads(line) + assert data["source_type"] == "EXPORT_SUBMISSION_JOB" + assert data["event_data"]["submission_job_id"] == submission_id + assert data["event_data"]["status"] == expected_status + + @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", @@ -181,49 +233,6 @@ async def test_get_all_job_info(call_ray_start, tmp_path): # noqa: F811 assert found -@pytest.mark.asyncio -@pytest.mark.parametrize( - "call_ray_start", - ["ray start --head"], - indirect=True, -) -async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 - """Submission job export events are correctly written""" - address_info = ray.init(address=call_ray_start) - gcs_aio_client = GcsAioClient( - address=address_info["gcs_address"], nums_reconnect_retry=0 - ) - log_dir = ray._private.worker._global_node.get_logs_dir_path() - job_manager = JobManager(gcs_aio_client, log_dir) - - # Submit a job. - submission_id = await job_manager.submit_job( - entrypoint="python -c 'import ray; ray.init()'", - ) - - # Wait for the job to be finished. - await async_wait_for_condition_async_predicate( - check_job_succeeded, job_manager=job_manager, job_id=submission_id - ) - - # Verify export events are written - event_dir = f"{log_dir}/events" - assert os.path.isdir(event_dir) - event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" - assert os.path.isfile(event_file) - - with open(event_file, "r") as f: - lines = f.readlines() - assert len(lines) == 3 - expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"] - - for line, expected_status in zip(lines, expected_status_values): - data = json.loads(line) - assert data["source_type"] == "EXPORT_SUBMISSION_JOB" - assert data["event_data"]["submission_job_id"] == submission_id - assert data["event_data"]["status"] == expected_status - - @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", From 4794342c6d45551c16c78b0949d7d1b6947ef60a Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 4 Sep 2024 08:22:33 -0700 Subject: [PATCH 08/25] update Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/job/common.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 7a8130d773eb..753a4bed41b1 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -203,21 +203,18 @@ def __init__( gcs_aio_client: GcsAioClient, ): self._gcs_aio_client = gcs_aio_client + self._export_submission_job_event_logger = None try: - log_dir = ray._private.worker._global_node.get_logs_dir_path() - self._export_submission_job_event_logger = ( - get_export_event_logger( + if ray_constants.RAY_ENABLE_EXPORT_API_WRITE: + log_dir = ray._private.worker._global_node.get_logs_dir_path() + self._export_submission_job_event_logger = get_export_event_logger( ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir ) - if ray_constants.RAY_ENABLE_EXPORT_API_WRITE - else None - ) except Exception: logger.exception( "Unable to initialize export event logger so no export " "events will be written." ) - self._export_submission_job_event_logger = None async def put_info( self, job_id: str, job_info: JobInfo, overwrite: bool = True From ce59bc90dc197ea3514d20ab59dbca5becc924a0 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 4 Sep 2024 10:36:02 -0700 Subject: [PATCH 09/25] pass logdir Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/job/common.py | 9 ++------- python/ray/dashboard/modules/job/job_manager.py | 11 +++++++++-- python/ray/dashboard/modules/job/job_supervisor.py | 3 ++- .../dashboard/modules/job/tests/test_job_manager.py | 7 +++---- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 753a4bed41b1..8b61a1666c80 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -7,7 +7,6 @@ from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union -import ray from ray._private import ray_constants from ray._private.event.export_event_logger import get_export_event_logger from ray._private.gcs_utils import GcsAioClient @@ -198,15 +197,11 @@ class JobInfoStorageClient: JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" - def __init__( - self, - gcs_aio_client: GcsAioClient, - ): + def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): self._gcs_aio_client = gcs_aio_client self._export_submission_job_event_logger = None try: - if ray_constants.RAY_ENABLE_EXPORT_API_WRITE: - log_dir = ray._private.worker._global_node.get_logs_dir_path() + if ray_constants.RAY_ENABLE_EXPORT_API_WRITE and log_dir: self._export_submission_job_event_logger = get_export_event_logger( ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir ) diff --git a/python/ray/dashboard/modules/job/job_manager.py b/python/ray/dashboard/modules/job/job_manager.py index a4bbeecd2868..ab4d76546747 100644 --- a/python/ray/dashboard/modules/job/job_manager.py +++ b/python/ray/dashboard/modules/job/job_manager.py @@ -70,7 +70,8 @@ class JobManager: def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_aio_client = gcs_aio_client - self._job_info_client = JobInfoStorageClient(gcs_aio_client) + self._logs_dir = logs_dir + self._job_info_client = JobInfoStorageClient(gcs_aio_client, logs_dir) self._gcs_address = gcs_aio_client.address self._log_client = JobLogStorageClient() self._supervisor_actor_cls = ray.remote(JobSupervisor) @@ -537,7 +538,13 @@ async def submit_job( runtime_env, submission_id, resources_specified ), namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE, - ).remote(submission_id, entrypoint, metadata or {}, self._gcs_address) + ).remote( + submission_id, + entrypoint, + metadata or {}, + self._gcs_address, + self._logs_dir, + ) supervisor.run.remote( _start_signal_actor=_start_signal_actor, resources_specified=resources_specified, diff --git a/python/ray/dashboard/modules/job/job_supervisor.py b/python/ray/dashboard/modules/job/job_supervisor.py index a548af3afd86..d59526491ca4 100644 --- a/python/ray/dashboard/modules/job/job_supervisor.py +++ b/python/ray/dashboard/modules/job/job_supervisor.py @@ -68,10 +68,11 @@ def __init__( entrypoint: str, user_metadata: Dict[str, str], gcs_address: str, + logs_dir: Optional[str] = None, ): self._job_id = job_id gcs_aio_client = GcsAioClient(address=gcs_address) - self._job_info_client = JobInfoStorageClient(gcs_aio_client) + self._job_info_client = JobInfoStorageClient(gcs_aio_client, logs_dir) self._log_client = JobLogStorageClient() self._entrypoint = entrypoint diff --git a/python/ray/dashboard/modules/job/tests/test_job_manager.py b/python/ray/dashboard/modules/job/tests/test_job_manager.py index 54576c0b2a36..71182f1456d8 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_manager.py +++ b/python/ray/dashboard/modules/job/tests/test_job_manager.py @@ -61,15 +61,14 @@ ], indirect=True, ) -async def test_submission_job_export_events(call_ray_start): # noqa: F811 +async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 """Submission job export events are correctly written""" ray_constants.RAY_ENABLE_EXPORT_API_WRITE = True address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( address=address_info["gcs_address"], nums_reconnect_retry=0 ) - log_dir = ray._private.worker._global_node.get_logs_dir_path() - job_manager = JobManager(gcs_aio_client, log_dir) + job_manager = JobManager(gcs_aio_client, tmp_path) # Submit a job. submission_id = await job_manager.submit_job( @@ -82,7 +81,7 @@ async def test_submission_job_export_events(call_ray_start): # noqa: F811 ) # Verify export events are written - event_dir = f"{log_dir}/events" + event_dir = f"{tmp_path}/events" assert os.path.isdir(event_dir) event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" assert os.path.isfile(event_file) From 60e23aa5f9760487cc21e24021e9f5d0993e2d0f Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 4 Sep 2024 11:26:55 -0700 Subject: [PATCH 10/25] update Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/job/common.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 8b61a1666c80..0c87b11e7e1a 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -201,7 +201,7 @@ def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): self._gcs_aio_client = gcs_aio_client self._export_submission_job_event_logger = None try: - if ray_constants.RAY_ENABLE_EXPORT_API_WRITE and log_dir: + if ray_constants.RAY_ENABLE_EXPORT_API_WRITE and log_dir is not None: self._export_submission_job_event_logger = get_export_event_logger( ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir ) @@ -238,6 +238,9 @@ async def put_info( def _write_submission_job_export_event( self, job_id: str, job_info: JobInfo ) -> None: + if not self._export_submission_job_event_logger: + return + status_value_descriptor = ( ExportSubmissionJobEventData.JobStatus.DESCRIPTOR.values_by_name.get( job_info.status.name @@ -265,9 +268,7 @@ def _write_submission_job_export_event( driver_node_id=job_info.driver_node_id, driver_exit_code=job_info.driver_exit_code, ) - - if self._export_submission_job_event_logger: - self._export_submission_job_event_logger.send_event(submission_event_data) + self._export_submission_job_event_logger.send_event(submission_event_data) async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]: serialized_info = await self._gcs_aio_client.internal_kv_get( From bfefd8319c362158aa62f54a74e800128b049a87 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 4 Sep 2024 11:35:45 -0700 Subject: [PATCH 11/25] update Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/job/common.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 0c87b11e7e1a..b32567e49ae3 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -232,7 +232,10 @@ async def put_info( ) if added_num == 1 or overwrite: # Write export event if data was updated in the KV store - self._write_submission_job_export_event(job_id, job_info) + try: + self._write_submission_job_export_event(job_id, job_info) + except Exception: + logger.exception("Error while writing job submission export event.") return added_num == 1 def _write_submission_job_export_event( From 9fd8989a60faeb6e406ba6c998160cde340959e3 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Wed, 4 Sep 2024 16:59:27 -0700 Subject: [PATCH 12/25] update build file rules Signed-off-by: Nikita Vemuri --- BUILD.bazel | 23 ++++--------------- .../ray/_private/event/export_event_logger.py | 4 ++-- .../modules/event/tests/test_event.py | 4 ++-- python/ray/dashboard/modules/job/common.py | 4 ++-- src/ray/protobuf/BUILD | 3 --- 5 files changed, 10 insertions(+), 28 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 04d0f329710c..e1fca20b762f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2509,6 +2509,7 @@ filegroup( "//src/ray/protobuf:runtime_env_agent_py_proto", "//src/ray/protobuf:runtime_env_common_py_proto", "//src/ray/protobuf:usage_py_proto", + "//src/ray/protobuf:export_event_py_proto", ], ) @@ -2519,17 +2520,6 @@ filegroup( ], ) -filegroup( - name = "export_api_py_proto", - srcs = [ - "//src/ray/protobuf:export_event_py_proto", - "//src/ray/protobuf:export_task_event_py_proto", - "//src/ray/protobuf:export_node_event_py_proto", - "//src/ray/protobuf:export_actor_event_py_proto", - "//src/ray/protobuf:export_submission_job_event_py_proto", - ], -) - # This is a dummy test dependency that causes the python tests to be # re-run if any of these files changes. py_library( @@ -2559,12 +2549,6 @@ copy_to_workspace( dstdir = "python/ray/serve/generated", ) -copy_to_workspace( - name = "cp_export_api_py_proto", - srcs = [":export_api_py_proto"], - dstdir = "python/ray/core/generated/export_api", -) - copy_to_workspace( name = "cp_redis", srcs = [ @@ -2597,7 +2581,6 @@ genrule( srcs = [ ":cp_all_py_proto", ":cp_serve_py_proto", - ":cp_export_api_py_proto", ], outs = ["install_py_proto.out"], cmd = """ @@ -2606,7 +2589,6 @@ genrule( # shellcheck disable=SC2006 files=( `ls python/ray/core/generated/*_pb2*.py` \ - `ls python/ray/core/generated/export_api/*_pb2*.py` \ `ls python/ray/serve/generated/*_pb2*.py` \ ) sed -i -E 's/from src.ray.protobuf/from ./' "$${files[@]}" @@ -2621,6 +2603,9 @@ genrule( # of experimental. autoscale_files=(`ls python/ray/core/generated/instance_manager_pb2*.py`) sed -i -E 's/from ..experimental/from ./' "$${autoscale_files[@]}" + # Help the generated export api files to have the correct module + export_api_files=(`ls python/ray/core/generated/export*_pb2*.py`) + sed -i -E 's/from ..export_api/from ./' "$${export_api_files[@]}" echo "$${PWD}" > $@ """, local = 1, diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 62d3861ba067..6f65e72e36f7 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -8,8 +8,8 @@ from typing import Union from datetime import datetime -from ray.core.generated.export_api.export_event_pb2 import ExportEvent -from ray.core.generated.export_api.export_submission_job_event_pb2 import ( +from ray.core.generated.export_event_pb2 import ExportEvent +from ray.core.generated.export_submission_job_event_pb2 import ( ExportSubmissionJobEventData, ) from ray._private.protobuf_compat import message_to_dict diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index ffdcb4cf6381..2671d22f1739 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -26,8 +26,8 @@ ) from ray._private.utils import binary_to_hex from ray.cluster_utils import AutoscalingCluster -from ray.core.generated import event_pb2 -from ray.core.generated.export_api import ( +from ray.core.generated import ( + event_pb2, export_event_pb2, export_submission_job_event_pb2, ) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index b32567e49ae3..ebff4eafc5bd 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -11,8 +11,8 @@ from ray._private.event.export_event_logger import get_export_event_logger from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.packaging import parse_uri -from ray.core.generated.export_api.export_event_pb2 import ExportEvent -from ray.core.generated.export_api.export_submission_job_event_pb2 import ( +from ray.core.generated.export_event_pb2 import ExportEvent +from ray.core.generated.export_submission_job_event_pb2 import ( ExportSubmissionJobEventData, ) from ray.util.annotations import PublicAPI diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index bacf1fbe0481..37ab9a1de25a 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -245,9 +245,6 @@ python_grpc_compile( deps = [":event_proto"], ) -# Note: Any dependencies added to this rule should also be included -# in the :export_api_py_proto filegroup so the python files are -# correctly generated. proto_library( name = "export_event_proto", srcs = ["export_api/export_event.proto"], From aa280e84551a3c31f4d10932491eef7a57cd4d39 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 5 Sep 2024 10:14:30 -0700 Subject: [PATCH 13/25] merge Signed-off-by: Nikita Vemuri --- src/ray/protobuf/export_api/export_submission_job_event.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/protobuf/export_api/export_submission_job_event.proto b/src/ray/protobuf/export_api/export_submission_job_event.proto index 01898d663b3f..2ddb242548aa 100644 --- a/src/ray/protobuf/export_api/export_submission_job_event.proto +++ b/src/ray/protobuf/export_api/export_submission_job_event.proto @@ -62,5 +62,4 @@ message ExportSubmissionJobEventData { // The driver process exit code after the driver executed. Return None if driver // doesn't finish executing optional int32 driver_exit_code = 12; - } From 5e462f9929dffa4e2218556b7502ff1aa3996e1e Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 5 Sep 2024 10:24:18 -0700 Subject: [PATCH 14/25] add comments Signed-off-by: Nikita Vemuri --- .../ray/dashboard/modules/event/tests/test_event.py | 4 ++++ python/ray/dashboard/modules/job/common.py | 11 +++++++++++ .../dashboard/modules/job/tests/test_job_manager.py | 5 ++++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index 2671d22f1739..5c10c344112a 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -552,6 +552,10 @@ def verify(): def test_export_event_logger(tmp_path): + """ + Unit test a mock export event of type ExportSubmissionJobEventData + is correctly written to file. This doesn't events are correctly generated. + """ logger = get_export_event_logger( export_event_pb2.ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, str(tmp_path) ) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index ebff4eafc5bd..20d2173ecfac 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -198,6 +198,12 @@ class JobInfoStorageClient: JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): + """ + Initialize the JobInfoStorageClient which manages data in the internal KV store. + Export Submission Job events are written when the KV store is updated if + the feature flag is on and a log_dir is passed. log_dir doesn't need to be + passed if the caller is not modifying data in the KV store. + """ self._gcs_aio_client = gcs_aio_client self._export_submission_job_event_logger = None try: @@ -241,6 +247,11 @@ async def put_info( def _write_submission_job_export_event( self, job_id: str, job_info: JobInfo ) -> None: + """ + Write Submission Job export event if _export_submission_job_event_logger + exists. The logger will exist if the export API feature flag is enabled + and a log directory was passed to JobInfoStorageClient. + """ if not self._export_submission_job_event_logger: return diff --git a/python/ray/dashboard/modules/job/tests/test_job_manager.py b/python/ray/dashboard/modules/job/tests/test_job_manager.py index 71182f1456d8..f6417108ecc1 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_manager.py +++ b/python/ray/dashboard/modules/job/tests/test_job_manager.py @@ -62,7 +62,10 @@ indirect=True, ) async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 - """Submission job export events are correctly written""" + """ + Test submission job events are correctly generated and written to file + as the job goes through various state changes in its lifecycle. + """ ray_constants.RAY_ENABLE_EXPORT_API_WRITE = True address_info = ray.init(address=call_ray_start) gcs_aio_client = GcsAioClient( From a15a690b921d547b4b669350c7897e709193ce78 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 5 Sep 2024 17:49:24 -0700 Subject: [PATCH 15/25] update Signed-off-by: Nikita Vemuri --- python/ray/_private/event/export_event_logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 6f65e72e36f7..a6ddb4a699b5 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -21,7 +21,7 @@ ExportEventDataType = Union[ExportSubmissionJobEventData] -def get_event_id(): +def generate_event_id(): return "".join([random.choice(string.hexdigits) for _ in range(18)]) @@ -51,7 +51,7 @@ def send_event(self, event_data: ExportEventDataType): def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent: event = ExportEvent() - event.event_id = get_event_id() + event.event_id = generate_event_id() event.timestamp = int(datetime.now().timestamp()) if type(event_data) is ExportSubmissionJobEventData: event.submission_job_event_data.CopyFrom(event_data) From 3de365344f6c327f989521c4f500e8d428d2e123 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 12:58:35 -0700 Subject: [PATCH 16/25] update test Signed-off-by: Nikita Vemuri --- .../modules/job/tests/test_job_manager.py | 55 ----------- .../modules/tests/test_export_events.py | 93 +++++++++++++++++++ 2 files changed, 93 insertions(+), 55 deletions(-) create mode 100644 python/ray/dashboard/modules/tests/test_export_events.py diff --git a/python/ray/dashboard/modules/job/tests/test_job_manager.py b/python/ray/dashboard/modules/job/tests/test_job_manager.py index ddaeab03cbbb..c41937321a39 100644 --- a/python/ray/dashboard/modules/job/tests/test_job_manager.py +++ b/python/ray/dashboard/modules/job/tests/test_job_manager.py @@ -1,5 +1,4 @@ import asyncio -import json import os import signal import sys @@ -11,7 +10,6 @@ import pytest import ray -from ray._private import ray_constants from ray._private.gcs_utils import GcsAioClient from ray._private.ray_constants import ( DEFAULT_DASHBOARD_AGENT_LISTEN_PORT, @@ -48,59 +46,6 @@ import psutil -@pytest.mark.asyncio -@pytest.mark.parametrize( - "call_ray_start", - [ - { - "env": { - "RAY_enable_export_api_write": "true", - }, - "cmd": "ray start --head", - } - ], - indirect=True, -) -async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 - """ - Test submission job events are correctly generated and written to file - as the job goes through various state changes in its lifecycle. - """ - ray_constants.RAY_ENABLE_EXPORT_API_WRITE = True - address_info = ray.init(address=call_ray_start) - gcs_aio_client = GcsAioClient( - address=address_info["gcs_address"], nums_reconnect_retry=0 - ) - job_manager = JobManager(gcs_aio_client, tmp_path) - - # Submit a job. - submission_id = await job_manager.submit_job( - entrypoint="python -c 'import ray; ray.init()'", - ) - - # Wait for the job to be finished. - await async_wait_for_condition_async_predicate( - check_job_succeeded, job_manager=job_manager, job_id=submission_id - ) - - # Verify export events are written - event_dir = f"{tmp_path}/events" - assert os.path.isdir(event_dir) - event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" - assert os.path.isfile(event_file) - - with open(event_file, "r") as f: - lines = f.readlines() - assert len(lines) == 3 - expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"] - - for line, expected_status in zip(lines, expected_status_values): - data = json.loads(line) - assert data["source_type"] == "EXPORT_SUBMISSION_JOB" - assert data["event_data"]["submission_job_id"] == submission_id - assert data["event_data"]["status"] == expected_status - - @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", diff --git a/python/ray/dashboard/modules/tests/test_export_events.py b/python/ray/dashboard/modules/tests/test_export_events.py new file mode 100644 index 000000000000..627d8aac6192 --- /dev/null +++ b/python/ray/dashboard/modules/tests/test_export_events.py @@ -0,0 +1,93 @@ +import asyncio +import json +import os +import signal +import sys +import tempfile +import time +import urllib.request +from uuid import uuid4 + +import pytest + +os.environ['RAY_enable_export_api_write'] = "true" + +import ray +from ray._private.gcs_utils import GcsAioClient +from ray._private.test_utils import ( + async_wait_for_condition_async_predicate, +) +from ray.dashboard.modules.job.job_manager import ( + JobManager, +) +from ray.job_submission import JobStatus +from ray.tests.conftest import call_ray_start # noqa: F401 + +import psutil + +async def check_job_succeeded(job_manager, job_id): + data = await job_manager.get_job_info(job_id) + status = data.status + if status == JobStatus.FAILED: + raise RuntimeError(f"Job failed! {data.message}") + assert status in {JobStatus.PENDING, JobStatus.RUNNING, JobStatus.SUCCEEDED} + if status == JobStatus.SUCCEEDED: + assert data.driver_exit_code == 0 + else: + assert data.driver_exit_code is None + return status == JobStatus.SUCCEEDED + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + [ + { + "env": { + "RAY_enable_export_api_write": "true", + }, + "cmd": "ray start --head", + } + ], + indirect=True, +) +async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 + """ + Test submission job events are correctly generated and written to file + as the job goes through various state changes in its lifecycle. + """ + + address_info = ray.init(address=call_ray_start) + gcs_aio_client = GcsAioClient( + address=address_info["gcs_address"], nums_reconnect_retry=0 + ) + job_manager = JobManager(gcs_aio_client, tmp_path) + + # Submit a job. + submission_id = await job_manager.submit_job( + entrypoint="ls", + ) + + # Wait for the job to be finished. + await async_wait_for_condition_async_predicate( + check_job_succeeded, job_manager=job_manager, job_id=submission_id + ) + + # Verify export events are written + event_dir = f"{tmp_path}/events" + assert os.path.isdir(event_dir) + event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" + assert os.path.isfile(event_file) + + with open(event_file, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"] + + for line, expected_status in zip(lines, expected_status_values): + data = json.loads(line) + assert data["source_type"] == "EXPORT_SUBMISSION_JOB" + assert data["event_data"]["submission_job_id"] == submission_id + assert data["event_data"]["status"] == expected_status + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) \ No newline at end of file From 9dd473d70ac0233b7de04b2dba0089ce6fda3354 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 13:00:10 -0700 Subject: [PATCH 17/25] formatting Signed-off-by: Nikita Vemuri --- .../modules/tests/test_export_events.py | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/python/ray/dashboard/modules/tests/test_export_events.py b/python/ray/dashboard/modules/tests/test_export_events.py index 627d8aac6192..ac680d825ade 100644 --- a/python/ray/dashboard/modules/tests/test_export_events.py +++ b/python/ray/dashboard/modules/tests/test_export_events.py @@ -1,29 +1,18 @@ -import asyncio import json import os -import signal import sys -import tempfile -import time -import urllib.request -from uuid import uuid4 import pytest -os.environ['RAY_enable_export_api_write'] = "true" - import ray from ray._private.gcs_utils import GcsAioClient -from ray._private.test_utils import ( - async_wait_for_condition_async_predicate, -) -from ray.dashboard.modules.job.job_manager import ( - JobManager, -) +from ray._private.test_utils import async_wait_for_condition_async_predicate +from ray.dashboard.modules.job.job_manager import JobManager from ray.job_submission import JobStatus from ray.tests.conftest import call_ray_start # noqa: F401 -import psutil +os.environ["RAY_enable_export_api_write"] = "true" + async def check_job_succeeded(job_manager, job_id): data = await job_manager.get_job_info(job_id) @@ -37,6 +26,7 @@ async def check_job_succeeded(job_manager, job_id): assert data.driver_exit_code is None return status == JobStatus.SUCCEEDED + @pytest.mark.asyncio @pytest.mark.parametrize( "call_ray_start", @@ -89,5 +79,6 @@ async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: assert data["event_data"]["submission_job_id"] == submission_id assert data["event_data"]["status"] == expected_status + if __name__ == "__main__": - sys.exit(pytest.main(["-v", __file__])) \ No newline at end of file + sys.exit(pytest.main(["-v", __file__])) From 04126881aedaecb72baf3d104c3c10234fee9402 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 13:12:08 -0700 Subject: [PATCH 18/25] formatting Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/tests/test_export_events.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/dashboard/modules/tests/test_export_events.py b/python/ray/dashboard/modules/tests/test_export_events.py index ac680d825ade..a15de83d933d 100644 --- a/python/ray/dashboard/modules/tests/test_export_events.py +++ b/python/ray/dashboard/modules/tests/test_export_events.py @@ -1,9 +1,12 @@ +# isort: skip_file import json import os import sys import pytest +os.environ["RAY_enable_export_api_write"] = "true" + import ray from ray._private.gcs_utils import GcsAioClient from ray._private.test_utils import async_wait_for_condition_async_predicate @@ -11,8 +14,6 @@ from ray.job_submission import JobStatus from ray.tests.conftest import call_ray_start # noqa: F401 -os.environ["RAY_enable_export_api_write"] = "true" - async def check_job_succeeded(job_manager, job_id): data = await job_manager.get_job_info(job_id) From 5f41b82083a52946d75d42192c16637cfa8d75b5 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 13:13:51 -0700 Subject: [PATCH 19/25] formatting Signed-off-by: Nikita Vemuri --- .../dashboard/modules/event/tests/test_event.py | 16 ++++++++++------ .../ray/dashboard/modules/job/job_supervisor.py | 14 +++++++++----- .../modules/tests/test_export_events.py | 3 +++ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index 5c10c344112a..e0830b275f57 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -43,9 +43,11 @@ def _get_event(msg="empty message", job_id=None, source_type=None): return { "event_id": binary_to_hex(np.random.bytes(18)), - "source_type": random.choice(event_pb2.Event.SourceType.keys()) - if source_type is None - else source_type, + "source_type": ( + random.choice(event_pb2.Event.SourceType.keys()) + if source_type is None + else source_type + ), "host_name": "po-dev.inc.alipay.net", "pid": random.randint(1, 65536), "label": "", @@ -53,9 +55,11 @@ def _get_event(msg="empty message", job_id=None, source_type=None): "timestamp": time.time(), "severity": "INFO", "custom_fields": { - "job_id": ray.JobID.from_int(random.randint(1, 100)).hex() - if job_id is None - else job_id, + "job_id": ( + ray.JobID.from_int(random.randint(1, 100)).hex() + if job_id is None + else job_id + ), "node_id": "", "task_id": "", }, diff --git a/python/ray/dashboard/modules/job/job_supervisor.py b/python/ray/dashboard/modules/job/job_supervisor.py index 316c105f7e65..ec0f66dfa0a2 100644 --- a/python/ray/dashboard/modules/job/job_supervisor.py +++ b/python/ray/dashboard/modules/job/job_supervisor.py @@ -179,11 +179,15 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen: # Ray intentionally blocks SIGINT in all processes, so if the user wants # to stop job through SIGINT, we need to unblock it in the child process preexec_fn=( - lambda: signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT}) - ) - if sys.platform != "win32" - and os.environ.get("RAY_JOB_STOP_SIGNAL") == "SIGINT" - else None, + ( + lambda: signal.pthread_sigmask( + signal.SIG_UNBLOCK, {signal.SIGINT} + ) + ) + if sys.platform != "win32" + and os.environ.get("RAY_JOB_STOP_SIGNAL") == "SIGINT" + else None + ), ) parent_pid = os.getpid() child_pid = child_process.pid diff --git a/python/ray/dashboard/modules/tests/test_export_events.py b/python/ray/dashboard/modules/tests/test_export_events.py index a15de83d933d..b592af881267 100644 --- a/python/ray/dashboard/modules/tests/test_export_events.py +++ b/python/ray/dashboard/modules/tests/test_export_events.py @@ -5,6 +5,9 @@ import pytest +# RAY_enable_export_api_write env var must be set before importing +# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE +# even outside a Ray driver. os.environ["RAY_enable_export_api_write"] = "true" import ray From 8d6ff9d80ab6720d8179cc325e7ecfa019fdfff9 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 17:07:17 -0700 Subject: [PATCH 20/25] formatting Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/tests/test_export_events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/dashboard/modules/tests/test_export_events.py b/python/ray/dashboard/modules/tests/test_export_events.py index b592af881267..013416eb8e41 100644 --- a/python/ray/dashboard/modules/tests/test_export_events.py +++ b/python/ray/dashboard/modules/tests/test_export_events.py @@ -1,4 +1,5 @@ # isort: skip_file +# flake8: noqa E402 import json import os import sys From 4790c6c2fc9163ed6fda0881348f4f2a4b621d7e Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 17:24:36 -0700 Subject: [PATCH 21/25] update Signed-off-by: Nikita Vemuri --- python/ray/_private/event/export_event_logger.py | 8 ++++---- python/ray/dashboard/modules/job/common.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index a6ddb4a699b5..9d66382dcf27 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -53,7 +53,7 @@ def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent: event = ExportEvent() event.event_id = generate_event_id() event.timestamp = int(datetime.now().timestamp()) - if type(event_data) is ExportSubmissionJobEventData: + if isinstance(event_data, ExportSubmissionJobEventData): event.submission_job_event_data.CopyFrom(event_data) event.source_type = ExportEvent.SourceType.EXPORT_SUBMISSION_JOB else: @@ -97,8 +97,8 @@ def _export_event_to_string(self, event: ExportEvent) -> str: return json.dumps(event_json) -def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: str): - logger = logging.getLogger("_ray_export_event_logger") +def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: str) -> logging.Logger: + logger = logging.getLogger("_ray_export_event_logger_" + source) logger.setLevel(logging.INFO) dir_path = pathlib.Path(sink_dir) / "events" filepath = dir_path / f"event_{source}.log" @@ -118,7 +118,7 @@ def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: st _export_event_logger = {} -def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str): +def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str) -> logging.Logger: """Get the export event logger of the current process. There's only one logger per export event source. diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 20d2173ecfac..3d87517f69a5 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -205,7 +205,7 @@ def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): passed if the caller is not modifying data in the KV store. """ self._gcs_aio_client = gcs_aio_client - self._export_submission_job_event_logger = None + self._export_submission_job_event_logger: logging.Logger = None try: if ray_constants.RAY_ENABLE_EXPORT_API_WRITE and log_dir is not None: self._export_submission_job_event_logger = get_export_event_logger( From 65df2847471b23359a5e8e8517d3a0b3049a5e49 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 17:25:23 -0700 Subject: [PATCH 22/25] formatting Signed-off-by: Nikita Vemuri --- python/ray/_private/event/export_event_logger.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 9d66382dcf27..67902fc18901 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -97,7 +97,9 @@ def _export_event_to_string(self, event: ExportEvent) -> str: return json.dumps(event_json) -def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: str) -> logging.Logger: +def _build_export_event_file_logger( + source: ExportEvent.SourceType, sink_dir: str +) -> logging.Logger: logger = logging.getLogger("_ray_export_event_logger_" + source) logger.setLevel(logging.INFO) dir_path = pathlib.Path(sink_dir) / "events" @@ -118,7 +120,9 @@ def _build_export_event_file_logger(source: ExportEvent.SourceType, sink_dir: st _export_event_logger = {} -def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str) -> logging.Logger: +def get_export_event_logger( + source: ExportEvent.SourceType, sink_dir: str +) -> logging.Logger: """Get the export event logger of the current process. There's only one logger per export event source. From 3d91b9242214baf387ec0cb5a46da0fb30e8fdd1 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 17:38:26 -0700 Subject: [PATCH 23/25] update Signed-off-by: Nikita Vemuri --- python/ray/_private/event/export_event_logger.py | 7 +++++-- python/ray/_private/ray_constants.py | 6 ++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py index 67902fc18901..217563996ed1 100644 --- a/python/ray/_private/event/export_event_logger.py +++ b/python/ray/_private/event/export_event_logger.py @@ -7,7 +7,7 @@ from typing import Union from datetime import datetime - +from ray._private import ray_constants from ray.core.generated.export_event_pb2 import ExportEvent from ray.core.generated.export_submission_job_event_pb2 import ( ExportSubmissionJobEventData, @@ -107,8 +107,11 @@ def _build_export_event_file_logger( dir_path.mkdir(exist_ok=True) filepath.touch(exist_ok=True) # Configure the logger. + # Default is 100 MB max file size handler = logging.handlers.RotatingFileHandler( - filepath, maxBytes=(100 * 1e6), backupCount=20 # 100 MB max file size + filepath, + maxBytes=(ray_constants.RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES), + backupCount=ray_constants.RAY_EXPORT_EVENT_MAX_BACKUP_COUNT, ) logger.addHandler(handler) logger.propagate = False diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index c042a9a59e9d..244bbe42ab3f 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -509,3 +509,9 @@ def gcs_actor_scheduling_enabled(): RAY_BACKEND_LOG_JSON_ENV_VAR = "RAY_BACKEND_LOG_JSON" RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False) + +RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES = env_bool( + "RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES", 100 * 1e6 +) + +RAY_EXPORT_EVENT_MAX_BACKUP_COUNT = env_bool("RAY_EXPORT_EVENT_MAX_BACKUP_COUNT", 20) From 3e725c930af9e2a163274ec0685c69d375ea0f10 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Thu, 19 Sep 2024 21:51:57 -0700 Subject: [PATCH 24/25] move test Signed-off-by: Nikita Vemuri --- .../tests/test_generate_export_events.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename python/ray/dashboard/modules/{tests/test_export_events.py => event/tests/test_generate_export_events.py} (100%) diff --git a/python/ray/dashboard/modules/tests/test_export_events.py b/python/ray/dashboard/modules/event/tests/test_generate_export_events.py similarity index 100% rename from python/ray/dashboard/modules/tests/test_export_events.py rename to python/ray/dashboard/modules/event/tests/test_generate_export_events.py From d44d9ba788173bcd830d2c9dc7276b6ac83a4d21 Mon Sep 17 00:00:00 2001 From: Nikita Vemuri Date: Mon, 23 Sep 2024 14:23:51 -0700 Subject: [PATCH 25/25] rename log dir Signed-off-by: Nikita Vemuri --- python/ray/dashboard/modules/job/common.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 3d87517f69a5..b928baab3aa2 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -197,19 +197,28 @@ class JobInfoStorageClient: JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" - def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None): + def __init__( + self, + gcs_aio_client: GcsAioClient, + export_event_log_dir_root: Optional[str] = None, + ): """ Initialize the JobInfoStorageClient which manages data in the internal KV store. Export Submission Job events are written when the KV store is updated if - the feature flag is on and a log_dir is passed. log_dir doesn't need to be - passed if the caller is not modifying data in the KV store. + the feature flag is on and a export_event_log_dir_root is passed. + export_event_log_dir_root doesn't need to be passed if the caller + is not modifying data in the KV store. """ self._gcs_aio_client = gcs_aio_client self._export_submission_job_event_logger: logging.Logger = None try: - if ray_constants.RAY_ENABLE_EXPORT_API_WRITE and log_dir is not None: + if ( + ray_constants.RAY_ENABLE_EXPORT_API_WRITE + and export_event_log_dir_root is not None + ): self._export_submission_job_event_logger = get_export_event_logger( - ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, log_dir + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, + export_event_log_dir_root, ) except Exception: logger.exception(