diff --git a/server/processor/src/task_creator.rs b/server/processor/src/task_creator.rs index 8dbfc4e91..a1dfea132 100644 --- a/server/processor/src/task_creator.rs +++ b/server/processor/src/task_creator.rs @@ -1,3 +1,4 @@ +use core::task; use std::{sync::Arc, vec}; use anyhow::{anyhow, Result}; @@ -13,7 +14,7 @@ use data_model::{ TaskOutputsIngestedEvent, }; use state_store::IndexifyState; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; #[derive(Debug)] pub struct TaskCreationResult { @@ -68,7 +69,8 @@ impl TaskCreator { error!("error getting task from finished event: {:?}", e); e })?; - if task.is_none() { + + let Some(task) = task else { error!( "task not found for task finished event: {}", task_finished_event.task_id @@ -79,8 +81,7 @@ impl TaskCreator { &task_finished_event.invocation_id, None, )); - } - let task = task.ok_or(anyhow!("task not found: {}", task_finished_event.task_id))?; + }; let compute_graph_version = indexify_state .reader() @@ -222,7 +223,8 @@ impl TaskCreator { e ) })?; - if invocation_ctx.is_none() { + + let Some(mut invocation_ctx ) = invocation_ctx else { trace!("no invocation ctx, stopping scheduling of child tasks"); return Ok(TaskCreationResult::no_tasks( &task.namespace, @@ -230,11 +232,17 @@ impl TaskCreator { &task.invocation_id, None, )); + }; + + if invocation_ctx.completed { + warn!("invocation already completed, stopping scheduling of child tasks"); + return Ok(TaskCreationResult::no_tasks( + &task.namespace, + &task.compute_graph_name, + &task.invocation_id, + None, + )); } - let mut invocation_ctx = invocation_ctx.ok_or(anyhow!( - "invocation context not found for invocation_id {}", - task.invocation_id - ))?; trace!("invocation context: {:?}", invocation_ctx); invocation_ctx.update_analytics(&task);