Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change retry logic for jobs to change spot instance types depending on the retry count #2181

Open
ravenac95 opened this issue Sep 20, 2024 · 0 comments
Assignees

Comments

@ravenac95
Copy link
Member

ravenac95 commented Sep 20, 2024

What is it?

Currently, if the daily jobs attempt to retry they just keep attempting to use the same node types. However, because some of these jobs may be scheduled to spot instances, they might end up failing for some unknown reason. We should, instead, have them scheduled to run on progressively different instances. I asked dagster's GPT about it and this is what it said might be possible

Currently, there isn't a built-in way to dynamically update the op_tags for the k8s_job_executor based on the number of retries directly within Dagster. However, you can achieve this by implementing a custom retry mechanism using sensors or hooks to monitor the number of retries and adjust the configuration accordingly.

Here's a high-level approach to achieve this:

Track the Number of Retries: Use a sensor or a hook to monitor the job runs and keep track of the number of retries.

Adjust Configuration Based on Retries: When the number of retries exceeds a certain threshold (e.g., 2 times), modify the op_tags to allocate more resources or change other configurations.

Re-execute the Job with Updated Configuration: Trigger a re-execution of the job with the updated op_tags.
Below is a conceptual example using a sensor to monitor job failures and re-execute with updated op_tags:

from dagster import sensor, RunRequest, DagsterEventType, EventRecordsFilter, DagsterInstance, execute_job, ReexecutionOptions, reconstructable

# Define your job and ops with initial configuration
from dagster_k8s import k8s_job_executor
from dagster import job, op

@op(tags={"dagster-k8s/config": {"container_config": {"resources": {"requests": {"cpu": "200m", "memory": "32Mi"}}}}})
def my_op(context):
    context.log.info("Running my_op")

@job(executor_def=k8s_job_executor)
def my_job():
    my_op()

# Sensor to monitor job failures and re-execute with updated configuration
@sensor(job=my_job)
def retry_sensor(context):
    instance = DagsterInstance.get()
    run_failure_records = instance.get_event_records(
        EventRecordsFilter(event_type=DagsterEventType.RUN_FAILURE),
        ascending=False,
        limit=1,
    )

    if not run_failure_records:
        return

    failed_run_id = run_failure_records[0].run_id
    failed_run = instance.get_run_by_id(failed_run_id)
    retry_count = int(failed_run.tags.get('dagster/retry_number', '0'))

    if retry_count > 2:
        # Update op_tags for more resources
        updated_op_tags = {
            "dagster-k8s/config": {
                "container_config": {
                    "resources": {
                        "requests": {"cpu": "500m", "memory": "64Mi"},
                    }
                },
            }
        }
        # Re-execute the job with updated configuration
        options = ReexecutionOptions.from_failure(failed_run_id, instance)
        execute_job(
            reconstructable(my_job),
            instance=instance,
            reexecution_options=options,
            tags={"dagster/retry_number": str(retry_count + 1), **updated_op_tags}
        )
    else:
        # Re-execute the job with the same configuration
        options = ReexecutionOptions.from_failure(failed_run_id, instance)
        execute_job(
            reconstructable(my_job),
            instance=instance,
            reexecution_options=options,
            tags={"dagster/retry_number": str(retry_count + 1)}
        )
@ravenac95 ravenac95 self-assigned this Sep 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Backlog
Development

No branches or pull requests

1 participant