Skip to content

Commit

Permalink
fix(server): serialize task finalization during task output ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousben committed Jan 16, 2025
1 parent 1846a1b commit 1938f85
Show file tree
Hide file tree
Showing 15 changed files with 966 additions and 1,163 deletions.
56 changes: 41 additions & 15 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use filter::LabelsFilter;
use indexify_utils::{default_creation_time, get_epoch_time_in_ms};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use strum::{AsRefStr, Display};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateMachineMetadata {
Expand Down Expand Up @@ -377,7 +376,7 @@ impl ComputeGraph {
}
}

#[derive(Debug, Display, PartialEq)]
#[derive(Debug, strum::Display, PartialEq)]
pub enum ComputeGraphError {
VersionExists,
}
Expand Down Expand Up @@ -691,6 +690,20 @@ impl ReduceTask {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TaskOutputsIngestionStatus {
/// Outputs are not ingested yet.
Pending,
/// Outputs were ingested.
Ingested,
}

impl TaskOutputsIngestionStatus {
fn pending() -> Self {
Self::Pending
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TaskOutcome {
Unknown,
Expand All @@ -707,6 +720,8 @@ pub struct Task {
pub compute_graph_name: String,
pub invocation_id: String,
pub input_node_output_key: String,
#[serde(default = "TaskOutputsIngestionStatus::pending")]
pub output_status: TaskOutputsIngestionStatus,
pub outcome: TaskOutcome,
#[serde(default = "default_creation_time")]
pub creation_time: SystemTime,
Expand All @@ -717,10 +732,6 @@ pub struct Task {
}

impl Task {
pub fn terminal_state(&self) -> bool {
self.outcome != TaskOutcome::Unknown
}

pub fn key_prefix_for_fn(
namespace: &str,
compute_graph: &str,
Expand Down Expand Up @@ -828,6 +839,7 @@ impl TaskBuilder {
input_node_output_key: input_key,
invocation_id,
namespace,
output_status: TaskOutputsIngestionStatus::Pending,
outcome: TaskOutcome::Unknown,
creation_time: SystemTime::now(),
diagnostics: None,
Expand Down Expand Up @@ -904,15 +916,15 @@ pub struct InvokeComputeGraphEvent {
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
pub struct TaskFinishedEvent {
pub struct TaskFinalizedEvent {
pub namespace: String,
pub compute_graph: String,
pub compute_fn: String,
pub invocation_id: String,
pub task_id: TaskId,
}

impl fmt::Display for TaskFinishedEvent {
impl fmt::Display for TaskFinalizedEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
Expand All @@ -922,6 +934,18 @@ impl fmt::Display for TaskFinishedEvent {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct TaskOutputsIngestedEvent {
pub namespace: String,
pub compute_graph: String,
pub compute_fn: String,
pub invocation_id: String,
pub task_id: TaskId,
pub outcome: TaskOutcome,
pub executor_id: ExecutorId,
pub diagnostic: Option<TaskDiagnostics>,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct TaskCreatedEvent {
pub task: Task,
Expand All @@ -932,28 +956,29 @@ pub struct ExecutorRemovedEvent {
pub executor_id: ExecutorId,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct TombstoneComputeGraphEvent {
pub namespace: String,
pub compute_graph: String,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct TombstoneInvocationEvent {
pub namespace: String,
pub compute_graph: String,
pub invocation_id: String,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct ExecutorAddedEvent {
pub executor_id: ExecutorId,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, AsRefStr)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub enum ChangeType {
InvokeComputeGraph(InvokeComputeGraphEvent),
TaskFinished(TaskFinishedEvent),
TaskFinalized(TaskFinalizedEvent),
TaskOutputsIngested(TaskOutputsIngestedEvent),
TombstoneComputeGraph(TombstoneComputeGraphEvent),
TombstoneInvocation(TombstoneInvocationEvent),
ExecutorAdded(ExecutorAddedEvent),
Expand All @@ -966,7 +991,8 @@ impl fmt::Display for ChangeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChangeType::InvokeComputeGraph(_) => write!(f, "InvokeComputeGraph"),
ChangeType::TaskFinished(_) => write!(f, "TaskFinished"),
ChangeType::TaskFinalized(_) => write!(f, "TaskFinalized"),
ChangeType::TaskOutputsIngested(_) => write!(f, "TaskOutputsIngested"),
ChangeType::TombstoneComputeGraph(_) => write!(f, "TombstoneComputeGraph"),
ChangeType::ExecutorAdded(_) => write!(f, "ExecutorAdded"),
ChangeType::TombStoneExecutor(_) => write!(f, "TombStoneExecutor"),
Expand All @@ -977,7 +1003,7 @@ impl fmt::Display for ChangeType {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Copy, Ord, PartialOrd)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd)]
pub struct StateChangeId(u64);

impl StateChangeId {
Expand Down
8 changes: 4 additions & 4 deletions server/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,17 @@ impl StateStoreMetrics {
}
}

pub fn task_unassigned(&self, tasks: Vec<Task>) {
pub fn task_unassigned(&self, tasks: &[Task]) {
for task in tasks {
let id = FnMetricsId::from_task(&task);
let id = FnMetricsId::from_task(task);
match self.unassigned_tasks.write() {
Ok(mut count) => *count.entry(id).or_insert(0) += 1,
Err(e) => tracing::error!("Failed to lock unassigned_tasks: {:?}", e),
}
}
}

pub fn task_assigned(&self, tasks: Vec<Task>, executor_id: &str) {
pub fn task_assigned(&self, tasks: &[Task], executor_id: &str) {
match self.tasks_by_executor.write() {
Ok(mut tasks_by_executor) => {
tasks_by_executor
Expand All @@ -606,7 +606,7 @@ impl StateStoreMetrics {
}

for task in tasks {
let id = FnMetricsId::from_task(&task);
let id = FnMetricsId::from_task(task);
match self.assigned_tasks.write() {
Ok(mut count_assigned) => *count_assigned.entry(id.clone()).or_insert(0) += 1,
Err(e) => tracing::error!("Failed to lock assigned_tasks: {:?}", e),
Expand Down
74 changes: 50 additions & 24 deletions server/processor/src/graph_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use state_store::{
requests::{
DeleteComputeGraphRequest,
DeleteInvocationRequest,
FinalizeTaskRequest,
MutateClusterTopologyRequest,
NamespaceProcessorUpdateRequest,
ReductionTasks,
Expand All @@ -16,7 +17,7 @@ use state_store::{
IndexifyState,
};
use tokio::sync::Notify;
use tracing::info;
use tracing::{error, info};

use crate::{
task_allocator::{self, TaskPlacementResult},
Expand Down Expand Up @@ -55,18 +56,18 @@ impl GraphProcessor {
_ = change_events_rx.changed() => {
change_events_rx.borrow_and_update();
if let Err(err) = self.write_sm_update(&mut cached_state_changes, &notify).await {
tracing::error!("error processing state change: {:?}", err);
error!("error processing state change: {:?}", err);
continue
}
},
_ = notify.notified() => {
if let Err(err) = self.write_sm_update(&mut cached_state_changes, &notify).await {
tracing::error!("error processing state change: {:?}", err);
error!("error processing state change: {:?}", err);
continue
}
},
_ = shutdown_rx.changed() => {
tracing::info!("graph processor shutting down");
info!("graph processor shutting down");
break;
}
}
Expand All @@ -81,7 +82,14 @@ impl GraphProcessor {
// 1. First load 100 state changes. Process the `global` state changes first
// and then the `ns_` state changes
if cached_state_changes.is_empty() {
cached_state_changes.extend(self.indexify_state.reader().unprocessed_state_changes()?);
cached_state_changes.extend(
self.indexify_state
.reader()
.unprocessed_state_changes()?
// reversing the vec to process the oldest state changes first when using pop.
.into_iter()
.rev(),
);
}
// 2. If there are no state changes to process, return
// and wait for the scheduler to wake us up again when there are state changes
Expand All @@ -105,7 +113,12 @@ impl GraphProcessor {
let sm_update = match sm_update {
Ok(sm_update) => sm_update,
Err(err) => {
tracing::error!("error processing state change: {:?}", err);
// TODO: Determine if error is transient or not to determine if retrying should
// be done.
error!(
"error processing state change {}, marking as processed: {:?}",
state_change.change_type, err
);

// Sending NOOP SM Update
StateMachineUpdateRequest {
Expand All @@ -116,9 +129,11 @@ impl GraphProcessor {
};
// 6. Write the state change
if let Err(err) = self.indexify_state.write(sm_update).await {
tracing::error!(
"error writing state change: {:?}, attempting to mark the state change as NOOP",
err
// TODO: Determine if error is transient or not to determine if retrying should
// be done.
error!(
"error writing state change {}, marking as processed: {:?}",
state_change.change_type, err,
);
// 7. If SM update fails for whatever reason, lets just write a NOOP state
// change
Expand All @@ -136,7 +151,7 @@ impl GraphProcessor {
&self,
state_change: &StateChange,
) -> Result<StateMachineUpdateRequest> {
let scheduler_update = match &state_change.change_type {
match &state_change.change_type {
ChangeType::InvokeComputeGraph(event) => {
let task_creation_result = self
.task_creator
Expand All @@ -150,7 +165,20 @@ impl GraphProcessor {
&state_change,
))
}
ChangeType::TaskFinished(event) => {
ChangeType::TaskOutputsIngested(event) => Ok(StateMachineUpdateRequest {
payload: RequestPayload::FinalizeTask(FinalizeTaskRequest {
namespace: event.namespace.clone(),
compute_graph: event.compute_graph.clone(),
compute_fn: event.compute_fn.clone(),
invocation_id: event.invocation_id.clone(),
task_id: event.task_id.clone(),
task_outcome: event.outcome.clone(),
executor_id: event.executor_id.clone(),
diagnostics: event.diagnostic.clone(),
}),
processed_state_changes: vec![state_change.clone()],
}),
ChangeType::TaskFinalized(event) => {
let task_creation_result = self
.task_creator
.handle_task_finished_inner(self.indexify_state.clone(), event)
Expand Down Expand Up @@ -181,28 +209,27 @@ impl GraphProcessor {
ChangeType::ExecutorAdded(event) => {
info!("registering executor: {:?}", event);
if let Err(err) = self.task_allocator.refresh_executors() {
tracing::error!("error refreshing executors: {:?}", err);
error!("error refreshing executors: {:?}", err);
}
let result = self.task_allocator.schedule_unplaced_tasks()?;
Ok(task_placement_result_to_sm_update(result, &state_change))
}
ChangeType::ExecutorRemoved(_event) => {
info!("de-registering executor {:?}", _event);
ChangeType::ExecutorRemoved(event) => {
info!("de-registering executor {:?}", event);
if let Err(err) = self.task_allocator.refresh_executors() {
tracing::error!("error refreshing executors: {:?}", err);
error!("error refreshing executors: {:?}", err);
}
let task_allocation_result = self.task_allocator.schedule_unplaced_tasks();
if let Err(err) = &task_allocation_result {
tracing::error!("error scheduling unplaced tasks: {:?}", err);
}
let result = task_allocation_result.unwrap();
if result.task_placements.is_empty() {
let task_allocation = self.task_allocator.schedule_unplaced_tasks()?;
if task_allocation.task_placements.is_empty() {
Ok(StateMachineUpdateRequest {
payload: RequestPayload::Noop,
processed_state_changes: vec![state_change.clone()],
})
} else {
Ok(task_placement_result_to_sm_update(result, &state_change))
Ok(task_placement_result_to_sm_update(
task_allocation,
&state_change,
))
}
}
ChangeType::TombStoneExecutor(event) => {
Expand All @@ -220,8 +247,7 @@ impl GraphProcessor {
.schedule_tasks(vec![event.task.clone()])?;
Ok(task_placement_result_to_sm_update(result, &state_change))
}
};
scheduler_update
}
}
}

Expand Down
11 changes: 7 additions & 4 deletions server/processor/src/task_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use data_model::{
OutputPayload,
ReduceTask,
Task,
TaskFinishedEvent,
TaskFinalizedEvent,
TaskOutcome,
};
use state_store::IndexifyState;
use tracing::{debug, error, info, trace};
use tracing::{error, info, trace};

#[derive(Debug)]
pub struct TaskCreationResult {
Expand Down Expand Up @@ -51,7 +51,7 @@ impl TaskCreator {
pub async fn handle_task_finished_inner(
&self,
indexify_state: Arc<IndexifyState>,
task_finished_event: &TaskFinishedEvent,
task_finished_event: &TaskFinalizedEvent,
) -> Result<TaskCreationResult> {
let task = indexify_state
.reader()
Expand Down Expand Up @@ -209,6 +209,7 @@ impl TaskCreator {
)
})?;
if invocation_ctx.is_none() {
trace!("no invocation ctx, stopping scheduling of child tasks");
return Ok(TaskCreationResult::no_tasks(
&task.namespace,
&task.compute_graph_name,
Expand Down Expand Up @@ -378,7 +379,7 @@ impl TaskCreator {

// If there are no edges, check if the invocation should be finished.
if edges.is_none() {
debug!(
trace!(
"No more edges to schedule tasks for, waiting for outstanding tasks to finalize"
);
return Ok(TaskCreationResult::no_tasks(
Expand Down Expand Up @@ -548,6 +549,8 @@ impl TaskCreator {
new_tasks.push(new_task);
}
}

trace!("tasks: {:?}", new_tasks.len());
Ok(TaskCreationResult {
namespace: task.namespace.clone(),
compute_graph: task.compute_graph_name.clone(),
Expand Down
Loading

0 comments on commit 1938f85

Please sign in to comment.