Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move export events to separate folder #47747

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/_private/event/export_event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _build_export_event_file_logger(
) -> logging.Logger:
logger = logging.getLogger("_ray_export_event_logger_" + source)
logger.setLevel(logging.INFO)
dir_path = pathlib.Path(sink_dir) / "events"
dir_path = pathlib.Path(sink_dir) / "export_events"
filepath = dir_path / f"event_{source}.log"
dir_path.mkdir(exist_ok=True)
filepath.touch(exist_ok=True)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/modules/event/tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ def test_export_event_logger(tmp_path):
)
logger.send_event(event_data)

event_dir = tmp_path / "events"
event_dir = tmp_path / "export_events"
assert event_dir.exists()
event_file = event_dir / "event_EXPORT_SUBMISSION_JOB.log"
assert event_file.exists()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def test_submission_job_export_events(call_ray_start, tmp_path): # noqa:
)

# Verify export events are written
event_dir = f"{tmp_path}/events"
event_dir = f"{tmp_path}/export_events"
assert os.path.isdir(event_dir)
event_file = f"{event_dir}/event_EXPORT_SUBMISSION_JOB.log"
assert os.path.isfile(event_file)
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_runtime_env_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ def job_failed(job_id):
with pytest.raises(AssertionError):
assert_no_user_info_in_logs(USER_SECRET)

assert_no_user_info_in_logs(USER_SECRET, file_whitelist=["runtime_env*.log"])
assert_no_user_info_in_logs(
USER_SECRET, file_whitelist=["runtime_env*.log", "event_EXPORT*.log"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary? There shouldn't be "user info" in the export events is there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Various export events contain runtime env info (which is considered user info) because the runtime env is returned in some dashboard APIs

)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) {
std::vector<std::string> vc;
for (int i = 0; i * batch_size < max_export_events_on_buffer; i++) {
task_event_buffer_->FlushEvents(true);
ReadContentFromFile(
vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
ReadContentFromFile(vc,
log_dir_ + "/export_events/event_EXPORT_TASK_" +
std::to_string(getpid()) + ".log");
EXPECT_EQ((int)vc.size(), (i + 1) * batch_size);
vc.clear();
}
Expand All @@ -178,7 +179,8 @@ TEST_F(TaskEventTestWriteExport, TestWriteTaskExportEvents) {
// max_export_events_on_buffer > task_events_max_num_status_events_buffer_on_worker
vc.clear();
ReadContentFromFile(
vc, log_dir_ + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
vc,
log_dir_ + "/export_events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
EXPECT_EQ((int)vc.size(), max_export_events_on_buffer);
for (size_t i = 0; i < max_export_events_on_buffer; i++) {
json export_event_as_json = json::parse(vc[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
"DEPENDENCIES_UNREADY", "PENDING_CREATION", "ALIVE", "DEAD"};
std::vector<std::string> vc;
for (int i = 0; i < num_retry; i++) {
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log");
Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_ACTOR.log");
if ((int)vc.size() == num_export_events) {
for (int event_idx = 0; event_idx < num_export_events; event_idx++) {
json export_event_as_json = json::parse(vc[event_idx]);
Expand All @@ -319,7 +319,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
vc.clear();
}
}
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_ACTOR.log");
Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_ACTOR.log");
std::ostringstream lines;
for (auto line : vc) {
lines << line << "\n";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ TEST_F(GcsJobManagerTest, TestExportDriverJobEvents) {
promise.get_future().get();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_DRIVER_JOB.log");
Mocker::ReadContentFromFile(vc,
log_dir_ + "/export_events/event_EXPORT_DRIVER_JOB.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["is_dead"], false);
Expand All @@ -149,7 +150,8 @@ TEST_F(GcsJobManagerTest, TestExportDriverJobEvents) {
job_finished_promise.get_future().get();

vc.clear();
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_DRIVER_JOB.log");
Mocker::ReadContentFromFile(vc,
log_dir_ + "/export_events/event_EXPORT_DRIVER_JOB.log");
ASSERT_EQ((int)vc.size(), 2);
event_data = json::parse(vc[1])["event_data"].get<json>();
ASSERT_EQ(event_data["is_dead"], true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ TEST_F(GcsNodeManagerExportAPITest, TestExportEventRegisterNode) {
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "ALIVE");
Expand Down Expand Up @@ -120,7 +120,7 @@ TEST_F(GcsNodeManagerExportAPITest, TestExportEventUnregisterNode) {
io_service_.poll();

std::vector<std::string> vc;
Mocker::ReadContentFromFile(vc, log_dir_ + "/events/event_EXPORT_NODE.log");
Mocker::ReadContentFromFile(vc, log_dir_ + "/export_events/event_EXPORT_NODE.log");
ASSERT_EQ((int)vc.size(), 1);
json event_data = json::parse(vc[0])["event_data"].get<json>();
ASSERT_EQ(event_data["state"], "DEAD");
Expand Down
1 change: 1 addition & 0 deletions src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ void RayEventInit_(const std::vector<SourceTypeVariant> source_types,
} else if (auto export_event_source_type_ptr =
std::get_if<rpc::ExportEvent_SourceType>(&source_type)) {
// For export events
event_dir = std::filesystem::path(log_dir) / std::filesystem::path("export_events");
source_type_name = ExportEvent_SourceType_Name(*export_event_source_type_ptr);
ray::EventManager::Instance().AddExportReporter(
*export_event_source_type_ptr,
Expand Down
26 changes: 19 additions & 7 deletions src/ray/util/tests/event_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,18 +471,27 @@ TEST_F(EventTest, TestWithField) {
}

TEST_F(EventTest, TestExportEvent) {
std::vector<SourceTypeVariant> source_types = {rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK, rpc::Event_SourceType::Event_SourceType_RAYLET};
RayEventInit_(source_types, absl::flat_hash_map<std::string, std::string>(), log_dir, "warning", false);

std::shared_ptr<rpc::ExportTaskEventData> task_event_ptr = std::make_shared<rpc::ExportTaskEventData>();
std::vector<SourceTypeVariant> source_types = {
rpc::ExportEvent_SourceType::ExportEvent_SourceType_EXPORT_TASK,
rpc::Event_SourceType::Event_SourceType_RAYLET};
RayEventInit_(source_types,
absl::flat_hash_map<std::string, std::string>(),
log_dir,
"warning",
false);

std::shared_ptr<rpc::ExportTaskEventData> task_event_ptr =
std::make_shared<rpc::ExportTaskEventData>();
task_event_ptr->set_task_id("task_id0");
task_event_ptr->set_attempt_number(1);
task_event_ptr->set_job_id("job_id0");

std::string export_event_data_str;
google::protobuf::util::JsonPrintOptions options;
options.preserve_proto_field_names = true;
RAY_CHECK(google::protobuf::util::MessageToJsonString(*task_event_ptr, &export_event_data_str, options).ok());
RAY_CHECK(google::protobuf::util::MessageToJsonString(
*task_event_ptr, &export_event_data_str, options)
.ok());
json event_data_as_json = json::parse(export_event_data_str);

RayExportEvent(task_event_ptr).SendEvent();
Expand All @@ -491,7 +500,9 @@ TEST_F(EventTest, TestExportEvent) {
RAY_EVENT(WARNING, "label") << "test warning";

std::vector<std::string> vc;
ReadContentFromFile(vc, log_dir + "/events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");
ReadContentFromFile(
vc,
log_dir + "/export_events/event_EXPORT_TASK_" + std::to_string(getpid()) + ".log");

EXPECT_EQ((int)vc.size(), 1);

Expand Down Expand Up @@ -558,7 +569,8 @@ TEST_F(EventTest, TestRayEventInit) {
custom_fields.emplace("node_id", "node 1");
custom_fields.emplace("job_id", "job 1");
custom_fields.emplace("task_id", "task 1");
const std::vector<SourceTypeVariant> source_types = {rpc::Event_SourceType::Event_SourceType_RAYLET};
const std::vector<SourceTypeVariant> source_types = {
rpc::Event_SourceType::Event_SourceType_RAYLET};
RayEventInit_(source_types, custom_fields, log_dir, "warning", false);

RAY_EVENT(FATAL, "label") << "test error event";
Expand Down