Skip to content

Commit

Permalink
Fix graph invocation ctx (#1202)
Browse files Browse the repository at this point in the history
* updatating graph invocation ctx

* updating graph context
  • Loading branch information
diptanu authored Jan 30, 2025
1 parent 2a6fcb7 commit d387d60
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
2 changes: 1 addition & 1 deletion server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "indexify-server"
version = "0.2.23"
version = "0.2.24"
edition = "2021"
authors = ["Tensorlake Inc. <[email protected]>"]
license = "Apache-2.0"
Expand Down
4 changes: 4 additions & 0 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ impl GraphInvocationCtx {
pub fn get_task_analytics(&self, compute_fn: &str) -> Option<&TaskAnalytics> {
self.fn_task_analytics.get(compute_fn)
}

pub fn key_prefix_for_cg(namespace: &str, compute_graph: &str) -> String {
format!("{}|{}", namespace, compute_graph)
}
}

impl GraphInvocationCtxBuilder {
Expand Down
44 changes: 43 additions & 1 deletion server/state_store/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,47 @@ fn update_task_versions_for_cg(
Ok(())
}

fn update_graph_invocations_for_cg(
db: Arc<TransactionDB>,
txn: &Transaction<TransactionDB>,
compute_graph: &ComputeGraph,
) -> Result<()> {
let cg_prefix =
GraphInvocationCtx::key_prefix_for_cg(&compute_graph.namespace, &compute_graph.name);
let mut read_options = ReadOptions::default();
read_options.set_readahead_size(10_194_304);
let iter = db.iterator_cf_opt(
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
read_options,
IteratorMode::From(&cg_prefix.as_bytes(), Direction::Forward),
);

let mut graph_invocation_ctx_to_update = HashMap::new();
for kv in iter {
let (key, val) = kv?;
let mut graph_invocation_ctx: GraphInvocationCtx = JsonEncoder::decode(&val)?;
if (graph_invocation_ctx.graph_version != compute_graph.version) &&
!graph_invocation_ctx.completed
{
info!(
"updating graph_invocation_ctx for invocation id: {} from version: {} to version: {}",
graph_invocation_ctx.invocation_id, graph_invocation_ctx.graph_version.0, compute_graph.version.0
);
graph_invocation_ctx.graph_version = compute_graph.version.clone();
}
graph_invocation_ctx_to_update.insert(key, graph_invocation_ctx);
}
for (invocation_id, graph_invocation_ctx) in graph_invocation_ctx_to_update {
let serialized_task = JsonEncoder::encode(&graph_invocation_ctx)?;
txn.put_cf(
&IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&db),
&invocation_id,
&serialized_task,
)?;
}
Ok(())
}

pub(crate) fn create_or_update_compute_graph(
db: Arc<TransactionDB>,
txn: &Transaction<TransactionDB>,
Expand Down Expand Up @@ -536,7 +577,8 @@ pub(crate) fn create_or_update_compute_graph(
&serialized_compute_graph,
)?;
if upgrade_existing_tasks_to_current_version {
update_task_versions_for_cg(db, txn, &compute_graph)?;
update_task_versions_for_cg(db.clone(), txn, &compute_graph)?;
update_graph_invocations_for_cg(db.clone(), txn, &compute_graph)?;
}
Ok(())
}
Expand Down

0 comments on commit d387d60

Please sign in to comment.