Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[observability][export-api] Write submission job events #47468

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2525,6 +2525,7 @@ filegroup(
"//src/ray/protobuf:runtime_env_agent_py_proto",
"//src/ray/protobuf:runtime_env_common_py_proto",
"//src/ray/protobuf:usage_py_proto",
"//src/ray/protobuf:export_event_py_proto",
],
)

Expand Down Expand Up @@ -2618,6 +2619,9 @@ genrule(
# of experimental.
autoscale_files=(`ls python/ray/core/generated/instance_manager_pb2*.py`)
sed -i -E 's/from ..experimental/from ./' "$${autoscale_files[@]}"
# Help the generated export api files to have the correct module
export_api_files=(`ls python/ray/core/generated/export*_pb2*.py`)
sed -i -E 's/from ..export_api/from ./' "$${export_api_files[@]}"
echo "$${PWD}" > $@
""",
local = 1,
Expand Down
144 changes: 144 additions & 0 deletions python/ray/_private/event/export_event_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging
import pathlib
import json
import random
import string
import threading

from typing import Union
from datetime import datetime
from ray._private import ray_constants
from ray.core.generated.export_event_pb2 import ExportEvent
from ray.core.generated.export_submission_job_event_pb2 import (
ExportSubmissionJobEventData,
)
from ray._private.protobuf_compat import message_to_dict

global_logger = logging.getLogger(__name__)

# This contains the union of export event data types which emit events
# using the python ExportEventLoggerAdapter
ExportEventDataType = Union[ExportSubmissionJobEventData]


def generate_event_id():
return "".join([random.choice(string.hexdigits) for _ in range(18)])


class ExportEventLoggerAdapter:
def __init__(self, source: ExportEvent.SourceType, logger: logging.Logger):
"""Adapter for the Python logger that's used to emit export events."""
self.logger = logger
self.source = source

def send_event(self, event_data: ExportEventDataType):
# NOTE: Python logger is thread-safe,
# so we don't need to protect it using locks.
try:
event = self._create_export_event(event_data)
except TypeError:
global_logger.exception(
"Failed to create ExportEvent from event_data so no "
"event will be written to file."
)
return

event_as_str = self._export_event_to_string(event)

self.logger.info(event_as_str)
# Force flush so that we won't lose events
self.logger.handlers[0].flush()
Comment on lines +49 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a pattern we follow in the other event exporters as well?

I think batching flushing is reasonable if we get significant performance improvements at the loss of some reliability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is only used by a job, it is unlikely cause a problem, but I agree it is probably okay to lose some events here... (I don't think we guarantee flush every event after it is finished)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I added this behavior for consistency with the EventLoggerAdapter for existing python events

self.logger.handlers[0].flush()
The LogEventReporter for C++ events also flushes after each event by default
bool force_flush = true,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The export events from the python layer are unlikely to be created with very high volume (unlike tasks), so we can probably just keep this flush logic for now for consistency and optimize later if needed.


def _create_export_event(self, event_data: ExportEventDataType) -> ExportEvent:
event = ExportEvent()
event.event_id = generate_event_id()
event.timestamp = int(datetime.now().timestamp())
if isinstance(event_data, ExportSubmissionJobEventData):
event.submission_job_event_data.CopyFrom(event_data)
event.source_type = ExportEvent.SourceType.EXPORT_SUBMISSION_JOB
else:
raise TypeError(f"Invalid event_data type: {type(event_data)}")
if event.source_type != self.source:
global_logger.error(
f"event_data has source type {event.source_type}, however "
f"the event was sent to a logger with source {self.source}. "
f"The event will still be written to the file of {self.source} "
"but this indicates a bug in the code."
)
pass
return event

def _export_event_to_string(self, event: ExportEvent) -> str:
event_data_json = {}
proto_to_dict_options = {
"always_print_fields_with_no_presence": True,
"preserving_proto_field_name": True,
"use_integers_for_enums": False,
}
event_data_field_set = event.WhichOneof("event_data")
if event_data_field_set:
event_data_json = message_to_dict(
getattr(event, event_data_field_set),
**proto_to_dict_options,
)
else:
global_logger.error(
f"event_data missing from export event with id {event.event_id} "
f"and type {event.source_type}. An empty event will be written, "
"but this indicates a bug in the code. "
)
pass
event_json = {
"event_id": event.event_id,
"timestamp": event.timestamp,
"source_type": ExportEvent.SourceType.Name(event.source_type),
"event_data": event_data_json,
}
return json.dumps(event_json)


def _build_export_event_file_logger(
source: ExportEvent.SourceType, sink_dir: str
) -> logging.Logger:
logger = logging.getLogger("_ray_export_event_logger_" + source)
logger.setLevel(logging.INFO)
dir_path = pathlib.Path(sink_dir) / "events"
filepath = dir_path / f"event_{source}.log"
dir_path.mkdir(exist_ok=True)
filepath.touch(exist_ok=True)
# Configure the logger.
# Default is 100 MB max file size
handler = logging.handlers.RotatingFileHandler(
filepath,
maxBytes=(ray_constants.RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES),
backupCount=ray_constants.RAY_EXPORT_EVENT_MAX_BACKUP_COUNT,
)
logger.addHandler(handler)
logger.propagate = False
return logger


# This lock must be used when accessing or updating global event logger dict.
_export_event_logger_lock = threading.Lock()
_export_event_logger = {}


def get_export_event_logger(
source: ExportEvent.SourceType, sink_dir: str
) -> logging.Logger:
"""Get the export event logger of the current process.

There's only one logger per export event source.

Args:
source: The source of the export event.
sink_dir: The directory to sink event logs.
"""
with _export_event_logger_lock:
global _export_event_logger
source_name = ExportEvent.SourceType.Name(source)
if source_name not in _export_event_logger:
logger = _build_export_event_file_logger(source_name, sink_dir)
_export_event_logger[source_name] = ExportEventLoggerAdapter(source, logger)

return _export_event_logger[source_name]
8 changes: 8 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,3 +507,11 @@ def gcs_actor_scheduling_enabled():
RAY_LOGGING_CONFIG_ENCODING = os.environ.get("RAY_LOGGING_CONFIG_ENCODING")

RAY_BACKEND_LOG_JSON_ENV_VAR = "RAY_BACKEND_LOG_JSON"

RAY_ENABLE_EXPORT_API_WRITE = env_bool("RAY_enable_export_api_write", False)

RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES = env_bool(
"RAY_EXPORT_EVENT_MAX_FILE_SIZE_BYTES", 100 * 1e6
)

RAY_EXPORT_EVENT_MAX_BACKUP_COUNT = env_bool("RAY_EXPORT_EVENT_MAX_BACKUP_COUNT", 20)
63 changes: 56 additions & 7 deletions python/ray/dashboard/modules/event/tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

import ray
from ray._private.event.event_logger import filter_event_by_level, get_event_logger
from ray._private.event.export_event_logger import get_export_event_logger
from ray._private.protobuf_compat import message_to_dict
from ray._private.test_utils import (
format_web_url,
wait_for_condition,
wait_until_server_available,
)
from ray._private.utils import binary_to_hex
from ray.cluster_utils import AutoscalingCluster
from ray.core.generated import event_pb2
from ray.core.generated import (
event_pb2,
export_event_pb2,
export_submission_job_event_pb2,
)
from ray.dashboard.modules.event import event_consts
from ray.dashboard.modules.event.event_utils import monitor_events
from ray.dashboard.tests.conftest import * # noqa
Expand All @@ -37,19 +43,23 @@
def _get_event(msg="empty message", job_id=None, source_type=None):
return {
"event_id": binary_to_hex(np.random.bytes(18)),
"source_type": random.choice(event_pb2.Event.SourceType.keys())
if source_type is None
else source_type,
"source_type": (
random.choice(event_pb2.Event.SourceType.keys())
if source_type is None
else source_type
),
"host_name": "po-dev.inc.alipay.net",
"pid": random.randint(1, 65536),
"label": "",
"message": msg,
"timestamp": time.time(),
"severity": "INFO",
"custom_fields": {
"job_id": ray.JobID.from_int(random.randint(1, 100)).hex()
if job_id is None
else job_id,
"job_id": (
ray.JobID.from_int(random.randint(1, 100)).hex()
if job_id is None
else job_id
),
"node_id": "",
"task_id": "",
},
Expand Down Expand Up @@ -545,5 +555,44 @@ def verify():
pprint(list_cluster_events())


def test_export_event_logger(tmp_path):
"""
Unit test a mock export event of type ExportSubmissionJobEventData
is correctly written to file. This doesn't events are correctly generated.
"""
logger = get_export_event_logger(
export_event_pb2.ExportEvent.SourceType.EXPORT_SUBMISSION_JOB, str(tmp_path)
)
ExportSubmissionJobEventData = (
export_submission_job_event_pb2.ExportSubmissionJobEventData
)
event_data = ExportSubmissionJobEventData(
submission_job_id="submission_job_id0",
status=ExportSubmissionJobEventData.JobStatus.RUNNING,
entrypoint="ls",
metadata={},
)
logger.send_event(event_data)

event_dir = tmp_path / "events"
assert event_dir.exists()
event_file = event_dir / "event_EXPORT_SUBMISSION_JOB.log"
assert event_file.exists()

with event_file.open() as f:
lines = f.readlines()
assert len(lines) == 1

line = lines[0]
data = json.loads(line)
assert data["source_type"] == "EXPORT_SUBMISSION_JOB"
assert data["event_data"] == message_to_dict(
event_data,
always_print_fields_with_no_presence=True,
preserving_proto_field_name=True,
use_integers_for_enums=False,
)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# isort: skip_file
# flake8: noqa E402
import json
import os
import sys

import pytest

# RAY_enable_export_api_write env var must be set before importing
# `ray` so the correct value is set for RAY_ENABLE_EXPORT_API_WRITE
# even outside a Ray driver.
os.environ["RAY_enable_export_api_write"] = "true"

import ray
from ray._private.gcs_utils import GcsAioClient
from ray._private.test_utils import async_wait_for_condition_async_predicate
from ray.dashboard.modules.job.job_manager import JobManager
from ray.job_submission import JobStatus
from ray.tests.conftest import call_ray_start # noqa: F401


async def check_job_succeeded(job_manager, job_id):
data = await job_manager.get_job_info(job_id)
status = data.status
if status == JobStatus.FAILED:
raise RuntimeError(f"Job failed! {data.message}")
assert status in {JobStatus.PENDING, JobStatus.RUNNING, JobStatus.SUCCEEDED}
if status == JobStatus.SUCCEEDED:
assert data.driver_exit_code == 0
else:
assert data.driver_exit_code is None
return status == JobStatus.SUCCEEDED


@pytest.mark.asyncio
@pytest.mark.parametrize(
"call_ray_start",
[
{
"env": {
"RAY_enable_export_api_write": "true",
},
"cmd": "ray start --head",
}
],
indirect=True,
)
async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa: F811
"""
Test submission job events are correctly generated and written to file
as the job goes through various state changes in its lifecycle.
"""

address_info = ray.init(address=call_ray_start)
gcs_aio_client = GcsAioClient(
address=address_info["gcs_address"], nums_reconnect_retry=0
)
job_manager = JobManager(gcs_aio_client, tmp_path)

# Submit a job.
submission_id = await job_manager.submit_job(
entrypoint="ls",
)

# Wait for the job to be finished.
await async_wait_for_condition_async_predicate(
check_job_succeeded, job_manager=job_manager, job_id=submission_id
)

# Verify export events are written
event_dir = f"{tmp_path}/events"
assert os.path.isdir(event_dir)
event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log"
assert os.path.isfile(event_file)

with open(event_file, "r") as f:
lines = f.readlines()
assert len(lines) == 3
expected_status_values = ["PENDING", "RUNNING", "SUCCEEDED"]

for line, expected_status in zip(lines, expected_status_values):
data = json.loads(line)
assert data["source_type"] == "EXPORT_SUBMISSION_JOB"
assert data["event_data"]["submission_job_id"] == submission_id
assert data["event_data"]["status"] == expected_status


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
Loading