Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[observability][export-api] Write submission job events #47468

Open
wants to merge 26 commits into
base: master
Choose a base branch
from

Conversation

nikitavemuri
Copy link
Contributor

@nikitavemuri nikitavemuri commented Sep 3, 2024

Why are these changes needed?

  • Add ExportEventLoggerAdapter which will be used to write export events to file from python files. Only a single ExportEventLoggerAdapter instance will exist per source type, so callers can create or get this instance using get_export_event_logger which is thread safe.
  • Write Submission Job export events to file from JobInfoStorageClient.put_info which is called to update the JobInfo data in the internal KV store.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Nikita Vemuri added 6 commits September 3, 2024 00:31
wip
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
wip
Signed-off-by: Nikita Vemuri <[email protected]>
wip
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
@nikitavemuri nikitavemuri added the go add ONLY when ready to merge, run all tests label Sep 3, 2024
Nikita Vemuri added 10 commits September 3, 2024 21:10
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
@nikitavemuri nikitavemuri changed the title [WIP] Write job submission events [observability][export-api] Write submission job events Sep 5, 2024
@nikitavemuri nikitavemuri marked this pull request as ready for review September 5, 2024 20:05
ExportEventDataType = Union[ExportSubmissionJobEventData]


def get_event_id():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
def get_event_id():
def generate_event_id():

Comment on lines +49 to +50
# Force flush so that we won't lose events
self.logger.handlers[0].flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a pattern we follow in the other event exporters as well?

I think batching flushing is reasonable if we get significant performance improvements at the loss of some reliability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is only used by a job, it is unlikely cause a problem, but I agree it is probably okay to lose some events here... (I don't think we guarantee flush every event after it is finished)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I added this behavior for consistency with the EventLoggerAdapter for existing python events

self.logger.handlers[0].flush()
The LogEventReporter for C++ events also flushes after each event by default
bool force_flush = true,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The export events from the python layer are unlikely to be created with very high volume (unlike tasks), so we can probably just keep this flush logic for now for consistency and optimize later if needed.

_export_event_logger = {}


def get_export_event_logger(source: ExportEvent.SourceType, sink_dir: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check how we do structured logging in python layer and see if we should follow similar patterns?

Do they also use a singleton pattern like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structured logging fetches various registered loggers for the different source types, but the existing python events use this singleton pattern. We want to use a single logger to avoid any concurrency issues from multiple threads. Using register logger is an option which also is a singleton, but I think this API is cleaner because there is no separate initialization and get functions.

Test submission job events are correctly generated and written to file
as the job goes through various state changes in its lifecycle.
"""
ray_constants.RAY_ENABLE_EXPORT_API_WRITE = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this screws up the global variable for the rest of the tests. you should probably create a fixture if you want to do this. (or alternatively, just update os.environ)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to set the environment variable in addition to passing the env var when calling ray start, because the constant needs to be set outside ray tasks also when submitting a job.

filepath.touch(exist_ok=True)
# Configure the logger.
handler = logging.handlers.RotatingFileHandler(
filepath, maxBytes=(100 * 1e6), backupCount=20 # 100 MB max file size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it configurable via env var?

event = ExportEvent()
event.event_id = get_event_id()
event.timestamp = int(datetime.now().timestamp())
if type(event_data) is ExportSubmissionJobEventData:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if type(event_data) is ExportSubmissionJobEventData:
if isinstance(event_data, ExportSubmissionJobEventData):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Comment on lines +49 to +50
# Force flush so that we won't lose events
self.logger.handlers[0].flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is only used by a job, it is unlikely cause a problem, but I agree it is probably okay to lose some events here... (I don't think we guarantee flush every event after it is finished)

self._gcs_aio_client = gcs_aio_client
self._export_submission_job_event_logger = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a type!

@@ -189,8 +197,25 @@ class JobInfoStorageClient:
JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}"

def __init__(self, gcs_aio_client: GcsAioClient):
def __init__(self, gcs_aio_client: GcsAioClient, log_dir: Optional[str] = None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not allow log_dir = None case?

  def __init__(self, gcs_aio_client: GcsAioClient, log_dir: str):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept log_dir as an optional variable because the JobInfoStorageClient is called from quite a few places to get job info (no updates to the KV store so no export events are emitted). These callers don’t have a log_dir, so unless we have a global log dir that can be fetched making this variable non optional requires a refactor throughout the jobs code.

if added_num == 1 or overwrite:
# Write export event if data was updated in the KV store
try:
self._write_submission_job_export_event(job_id, job_info)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: we don't write any event for failure cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't expected to fail, but wrapped in a try catch so the remaining job code isn't affected in any way. If there is a failure though, we should log in the normal job log file and not export event log files because the export event files may be ingested by some consumer.

job_info.status.name
)
)
if status_value_descriptor is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does it happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a new JobInfo status is added to the python model without updating the proto. We do have a comment to keep these in sync though, so this should be unlikely.

@@ -46,6 +48,59 @@
import psutil


@pytest.mark.asyncio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create a new test suite for all export events? I assume we will need more e2e testing eventually in python layers, and it'd be nice to put them in the same file.

Nikita Vemuri added 5 commits September 5, 2024 17:49
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Nikita Vemuri added 5 commits September 19, 2024 13:13
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Signed-off-by: Nikita Vemuri <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants