diff --git a/BUILD.bazel b/BUILD.bazel index 94a70cd9280f..c121daaf99c8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2525,6 +2525,7 @@ filegroup( "//src/ray/protobuf:runtime_env_agent_py_proto", "//src/ray/protobuf:runtime_env_common_py_proto", "//src/ray/protobuf:usage_py_proto", + "//src/ray/protobuf:export_event_py_proto", ], ) @@ -2618,6 +2619,9 @@ genrule( # of experimental. autoscale_files=(`ls python/ray/core/generated/instance_manager_pb2*.py`) sed -i -E 's/from ..experimental/from ./' "$${autoscale_files[@]}" + # Help the generated export api files to have the correct module + export_api_files=(`ls python/ray/core/generated/export*_pb2*.py`) + sed -i -E 's/from ..export_api/from ./' "$${export_api_files[@]}" echo "$${PWD}" > $@ """, local = 1, diff --git a/python/ray/_private/event/export_event_logger.py b/python/ray/_private/event/export_event_logger.py new file mode 100644 index 000000000000..217563996ed1 --- /dev/null +++ b/python/ray/_private/event/export_event_logger.py @@ -0,0 +1,144 @@ +import logging +import pathlib +import json +import random +import string +import threading + +from typing import Union +from datetime import datetime +from ray._private import ray_constants +from ray.core.generated.export_event_pb2 import ExportEvent +from ray.core.generated.export_submission_job_event_pb2 import ( + ExportSubmissionJobEventData, +) +from ray._private.protobuf_compat import message_to_dict + +global_logger = logging.getLogger(__name__) + +# This contains the union of export event data types which emit events +# using the python ExportEventLoggerAdapter +ExportEventDataType = Union[ExportSubmissionJobEventData] + + +def generate_event_id(): + return "".join([random.choice(string.hexdigits) for _ in range(18)]) + + +class ExportEventLoggerAdapter: + def __init__(self, source: ExportEvent.SourceType, logger: logging.Logger): + """Adapter for the Python logger that's used to emit export events.""" + self.logger = logger + self.source = source + + def send_event(self, event_data: ExportEventDataType): + # NOTE: Python logger is thread-safe, + # so we don't need to protect it using locks. + try: + event = self._create_export_event(event_data) + except TypeError: + global_logger.exception( + "Failed to create ExportEvent from event_data so no " + "event will be written to file." + ) + return + + event_as_str = self._export_event_to_string(event) + + self.logger.info(event_as_str) + # Force flush so that we won't lose events + self.logger.handlers[0].flush() + + def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent: + event = ExportEvent() + event.event_id = generate_event_id() + event.timestamp = int(datetime.now().timestamp()) + if isinstance(event_data, ExportSubmissionJobEventData): + event.submission_job_event_data.CopyFrom(event_data) + event.source_type = ExportEvent.SourceType.EXPORT_SUBMISSION_JOB + else: + raise TypeError(f"Invalid event_data type: {type(event_data)}") + if event.source_type != self.source: + global_logger.error( + f"event_data has source type {event.source_type}, however " + f"the event was sent to a logger with source {self.source}. " + f"The event will still be written to the file of {self.source} " + "but this indicates a bug in the code." + ) + pass + return event + + def _export_event_to_string(self, event: ExportEvent) -> str: + event_data_json = {} + proto_to_dict_options = { + "always_print_fields_with_no_presence": True, + "preserving_proto_field_name": True, + "use_integers_for_enums": False, + } + event_data_field_set = event.WhichOneof("event_data") + if event_data_field_set: + event_data_json = message_to_dict( + getattr(event, event_data_field_set), + **proto_to_dict_options, + ) + else: + global_logger.error( + f"event_data missing from export event with id {event.event_id} " + f"and type {event.source_type}. An empty event will be written, " + "but this indicates a bug in the code. " + ) + pass + event_json = { + "event_id": event.event_id, + "timestamp": event.timestamp, + "source_type": ExportEvent.SourceType.Name(event.source_type), + "event_data": event_data_json, + } + return json.dumps(event_json) + + +def _build_export_event_file_logger( + source: ExportEvent.SourceType, sink_dir: str +) -> logging.Logger: + logger = logging.getLogger("_ray_export_event_logger_" + source) + logger.setLevel(logging.INFO) + dir_path = pathlib.Path(sink_dir) / "events" + filepath = dir_path / f"event_{source}.log" + dir_path.mkdir(exist_ok=True) + filepath.touch(exist_ok=True) + # Configure the logger. + # Default is 100 MB max file size + handler = logging.handlers.RotatingFileHandler( + filepath, + maxBytes=(ray_constants.RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES), + backupCount=ray_constants.RAY_EXPORT_EVENT_MAX_BACKUP_COUNT, + ) + logger.addHandler(handler) + logger.propagate = False + return logger + + +# This lock must be used when accessing or updating global event logger dict. +_export_event_logger_lock = threading.Lock() +_export_event_logger = {} + + +def get_export_event_logger( + source: ExportEvent.SourceType, sink_dir: str +) -> logging.Logger: + """Get the export event logger of the current process. + + There's only one logger per export event source. + + Args: + source: The source of the export event. + sink_dir: The directory to sink event logs. + """ + with _export_event_logger_lock: + global _export_event_logger + source_name = ExportEvent.SourceType.Name(source) + if source_name not in _export_event_logger: + logger = _build_export_event_file_logger(source_name, sink_dir) + _export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger) + + return _export_event_logger[source_name] diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 5ed290345843..244bbe42ab3f 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -507,3 +507,11 @@ def gcs_actor_scheduling_enabled(): RAY_LOGGING_CONFIG_ENCODING = os.environ.get("RAY_LOGGING_CONFIG_ENCODING") RAY_BACKEND_LOG_JSON_ENV_VAR = "RAY_BACKEND_LOG_JSON" + +RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False) + +RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES = env_bool( + "RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES", 100 * 1e6 +) + +RAY_EXPORT_EVENT_MAX_BACKUP_COUNT = env_bool("RAY_EXPORT_EVENT_MAX_BACKUP_COUNT", 20) diff --git a/python/ray/dashboard/modules/event/tests/test_event.py b/python/ray/dashboard/modules/event/tests/test_event.py index c7dbb18aafc4..e0830b275f57 100644 --- a/python/ray/dashboard/modules/event/tests/test_event.py +++ b/python/ray/dashboard/modules/event/tests/test_event.py @@ -17,6 +17,8 @@ import ray from ray._private.event.event_logger import filter_event_by_level, get_event_logger +from ray._private.event.export_event_logger import get_export_event_logger +from ray._private.protobuf_compat import message_to_dict from ray._private.test_utils import ( format_web_url, wait_for_condition, @@ -24,7 +26,11 @@ ) from ray._private.utils import binary_to_hex from ray.cluster_utils import AutoscalingCluster -from ray.core.generated import event_pb2 +from ray.core.generated import ( + event_pb2, + export_event_pb2, + export_submission_job_event_pb2, +) from ray.dashboard.modules.event import event_consts from ray.dashboard.modules.event.event_utils import monitor_events from ray.dashboard.tests.conftest import * # noqa @@ -37,9 +43,11 @@ def _get_event(msg="empty message", job_id=None, source_type=None): return { "event_id": binary_to_hex(np.random.bytes(18)), - "source_type": random.choice(event_pb2.Event.SourceType.keys()) - if source_type is None - else source_type, + "source_type": ( + random.choice(event_pb2.Event.SourceType.keys()) + if source_type is None + else source_type + ), "host_name": "po-dev.inc.alipay.net", "pid": random.randint(1, 65536), "label": "", @@ -47,9 +55,11 @@ def _get_event(msg="empty message", job_id=None, source_type=None): "timestamp": time.time(), "severity": "INFO", "custom_fields": { - "job_id": ray.JobID.from_int(random.randint(1, 100)).hex() - if job_id is None - else job_id, + "job_id": ( + ray.JobID.from_int(random.randint(1, 100)).hex() + if job_id is None + else job_id + ), "node_id": "", "task_id": "", }, @@ -545,5 +555,44 @@ def verify(): pprint(list_cluster_events()) +def test_export_event_logger(tmp_path): + """ + Unit test a mock export event of type ExportSubmissionJobEventData + is correctly written to file. This doesn't events are correctly generated. + """ + logger = get_export_event_logger( + export_event_pb2.ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, str(tmp_path) + ) + ExportSubmissionJobEventData = ( + export_submission_job_event_pb2.ExportSubmissionJobEventData + ) + event_data = ExportSubmissionJobEventData( + submission_job_id="submission_job_id0", + status=ExportSubmissionJobEventData.JobStatus.RUNNING, + entrypoint="ls", + metadata={}, + ) + logger.send_event(event_data) + + event_dir = tmp_path / "events" + assert event_dir.exists() + event_file = event_dir / "event_EXPORT_SUBMISSION_JOB.log" + assert event_file.exists() + + with event_file.open() as f: + lines = f.readlines() + assert len(lines) == 1 + + line = lines[0] + data = json.loads(line) + assert data["source_type"] == "EXPORT_SUBMISSION_JOB" + assert data["event_data"] == message_to_dict( + event_data, + always_print_fields_with_no_presence=True, + preserving_proto_field_name=True, + use_integers_for_enums=False, + ) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/dashboard/modules/event/tests/test_generate_export_events.py b/python/ray/dashboard/modules/event/tests/test_generate_export_events.py new file mode 100644 index 000000000000..013416eb8e41 --- /dev/null +++ b/python/ray/dashboard/modules/event/tests/test_generate_export_events.py @@ -0,0 +1,89 @@ +# isort: skip_file +# flake8: noqa E402 +import json +import os +import sys + +import pytest + +# RAY_enable_export_api_write env var must be set before importing +# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE +# even outside a Ray driver. +os.environ["RAY_enable_export_api_write"] = "true" + +import ray +from ray._private.gcs_utils import GcsAioClient +from ray._private.test_utils import async_wait_for_condition_async_predicate +from ray.dashboard.modules.job.job_manager import JobManager +from ray.job_submission import JobStatus +from ray.tests.conftest import call_ray_start # noqa: F401 + + +async def check_job_succeeded(job_manager, job_id): + data = await job_manager.get_job_info(job_id) + status = data.status + if status == JobStatus.FAILED: + raise RuntimeError(f"Job failed! {data.message}") + assert status in {JobStatus.PENDING, JobStatus.RUNNING, JobStatus.SUCCEEDED} + if status == JobStatus.SUCCEEDED: + assert data.driver_exit_code == 0 + else: + assert data.driver_exit_code is None + return status == JobStatus.SUCCEEDED + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "call_ray_start", + [ + { + "env": { + "RAY_enable_export_api_write": "true", + }, + "cmd": "ray start --head", + } + ], + indirect=True, +) +async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811 + """ + Test submission job events are correctly generated and written to file + as the job goes through various state changes in its lifecycle. + """ + + address_info = ray.init(address=call_ray_start) + gcs_aio_client = GcsAioClient( + address=address_info["gcs_address"], nums_reconnect_retry=0 + ) + job_manager = JobManager(gcs_aio_client, tmp_path) + + # Submit a job. + submission_id = await job_manager.submit_job( + entrypoint="ls", + ) + + # Wait for the job to be finished. + await async_wait_for_condition_async_predicate( + check_job_succeeded, job_manager=job_manager, job_id=submission_id + ) + + # Verify export events are written + event_dir = f"{tmp_path}/events" + assert os.path.isdir(event_dir) + event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log" + assert os.path.isfile(event_file) + + with open(event_file, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"] + + for line, expected_status in zip(lines, expected_status_values): + data = json.loads(line) + assert data["source_type"] == "EXPORT_SUBMISSION_JOB" + assert data["event_data"]["submission_job_id"] == submission_id + assert data["event_data"]["status"] == expected_status + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 06613ec27e42..b928baab3aa2 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -1,5 +1,6 @@ import asyncio import json +import logging import time from dataclasses import asdict, dataclass, replace from enum import Enum @@ -7,8 +8,13 @@ from typing import Any, Dict, Optional, Tuple, Union from ray._private import ray_constants +from ray._private.event.export_event_logger import get_export_event_logger from ray._private.gcs_utils import GcsAioClient from ray._private.runtime_env.packaging import parse_uri +from ray.core.generated.export_event_pb2 import ExportEvent +from ray.core.generated.export_submission_job_event_pb2 import ( + ExportSubmissionJobEventData, +) from ray.util.annotations import PublicAPI # NOTE(edoakes): these constants should be considered a public API because @@ -23,6 +29,8 @@ SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE" JOB_LOGS_PATH_TEMPLATE = "job-driver-{submission_id}.log" +logger = logging.getLogger(__name__) + @PublicAPI(stability="stable") class JobStatus(str, Enum): @@ -189,8 +197,34 @@ class JobInfoStorageClient: JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" - def __init__(self, gcs_aio_client: GcsAioClient): + def __init__( + self, + gcs_aio_client: GcsAioClient, + export_event_log_dir_root: Optional[str] = None, + ): + """ + Initialize the JobInfoStorageClient which manages data in the internal KV store. + Export Submission Job events are written when the KV store is updated if + the feature flag is on and a export_event_log_dir_root is passed. + export_event_log_dir_root doesn't need to be passed if the caller + is not modifying data in the KV store. + """ self._gcs_aio_client = gcs_aio_client + self._export_submission_job_event_logger: logging.Logger = None + try: + if ( + ray_constants.RAY_ENABLE_EXPORT_API_WRITE + and export_event_log_dir_root is not None + ): + self._export_submission_job_event_logger = get_export_event_logger( + ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, + export_event_log_dir_root, + ) + except Exception: + logger.exception( + "Unable to initialize export event logger so no export " + "events will be written." + ) async def put_info( self, job_id: str, job_info: JobInfo, overwrite: bool = True @@ -211,8 +245,54 @@ async def put_info( overwrite, namespace=ray_constants.KV_NAMESPACE_JOB, ) + if added_num == 1 or overwrite: + # Write export event if data was updated in the KV store + try: + self._write_submission_job_export_event(job_id, job_info) + except Exception: + logger.exception("Error while writing job submission export event.") return added_num == 1 + def _write_submission_job_export_event( + self, job_id: str, job_info: JobInfo + ) -> None: + """ + Write Submission Job export event if _export_submission_job_event_logger + exists. The logger will exist if the export API feature flag is enabled + and a log directory was passed to JobInfoStorageClient. + """ + if not self._export_submission_job_event_logger: + return + + status_value_descriptor = ( + ExportSubmissionJobEventData.JobStatus.DESCRIPTOR.values_by_name.get( + job_info.status.name + ) + ) + if status_value_descriptor is None: + logger.error( + f"{job_info.status.name} is not a valid " + "ExportSubmissionJobEventData.JobStatus enum value. This event " + "will not be written." + ) + return + job_status = status_value_descriptor.number + submission_event_data = ExportSubmissionJobEventData( + submission_job_id=job_id, + status=job_status, + entrypoint=job_info.entrypoint, + message=job_info.message, + metadata=job_info.metadata, + error_type=job_info.error_type, + start_time=job_info.start_time, + end_time=job_info.end_time, + runtime_env_json=json.dumps(job_info.runtime_env), + driver_agent_http_address=job_info.driver_agent_http_address, + driver_node_id=job_info.driver_node_id, + driver_exit_code=job_info.driver_exit_code, + ) + self._export_submission_job_event_logger.send_event(submission_event_data) + async def get_info(self, job_id: str, timeout: int = 30) -> Optional[JobInfo]: serialized_info = await self._gcs_aio_client.internal_kv_get( self.JOB_DATA_KEY.format(job_id=job_id).encode(), diff --git a/python/ray/dashboard/modules/job/job_manager.py b/python/ray/dashboard/modules/job/job_manager.py index ff14215e03a3..25847f5b0fbe 100644 --- a/python/ray/dashboard/modules/job/job_manager.py +++ b/python/ray/dashboard/modules/job/job_manager.py @@ -70,7 +70,8 @@ class JobManager: def __init__(self, gcs_aio_client: GcsAioClient, logs_dir: str): self._gcs_aio_client = gcs_aio_client - self._job_info_client = JobInfoStorageClient(gcs_aio_client) + self._logs_dir = logs_dir + self._job_info_client = JobInfoStorageClient(gcs_aio_client, logs_dir) self._gcs_address = gcs_aio_client.address self._cluster_id_hex = gcs_aio_client.cluster_id.hex() self._log_client = JobLogStorageClient() @@ -544,6 +545,7 @@ async def submit_job( metadata or {}, self._gcs_address, self._cluster_id_hex, + self._logs_dir, ) supervisor.run.remote( _start_signal_actor=_start_signal_actor, diff --git a/python/ray/dashboard/modules/job/job_supervisor.py b/python/ray/dashboard/modules/job/job_supervisor.py index 18974cd1ff4b..ec0f66dfa0a2 100644 --- a/python/ray/dashboard/modules/job/job_supervisor.py +++ b/python/ray/dashboard/modules/job/job_supervisor.py @@ -71,10 +71,11 @@ def __init__( user_metadata: Dict[str, str], gcs_address: str, cluster_id_hex: str, + logs_dir: Optional[str] = None, ): self._job_id = job_id gcs_aio_client = GcsAioClient(address=gcs_address, cluster_id=cluster_id_hex) - self._job_info_client = JobInfoStorageClient(gcs_aio_client) + self._job_info_client = JobInfoStorageClient(gcs_aio_client, logs_dir) self._log_client = JobLogStorageClient() self._entrypoint = entrypoint @@ -178,11 +179,15 @@ def _exec_entrypoint(self, logs_path: str) -> subprocess.Popen: # Ray intentionally blocks SIGINT in all processes, so if the user wants # to stop job through SIGINT, we need to unblock it in the child process preexec_fn=( - lambda: signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT}) - ) - if sys.platform != "win32" - and os.environ.get("RAY_JOB_STOP_SIGNAL") == "SIGINT" - else None, + ( + lambda: signal.pthread_sigmask( + signal.SIG_UNBLOCK, {signal.SIGINT} + ) + ) + if sys.platform != "win32" + and os.environ.get("RAY_JOB_STOP_SIGNAL") == "SIGINT" + else None + ), ) parent_pid = os.getpid() child_pid = child_process.pid