Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: assign worker id to avoid task duplication #241

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/src/logic/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ impl RequestExt for CreateRequest {
Some(Task {
id: Id::now(IdPrefix::Task),
start_time: Utc::now().timestamp_millis(),
worker_id: 0,
end_time: None,
payload: self.payload.clone(),
endpoint: self.endpoint.clone(),
status: None,
r#await: self.r#await,
log_trail: vec![],
metadata: RecordMetadata::default(),
})
}
Expand Down
3 changes: 1 addition & 2 deletions api/tests/http/crud.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::context::TestServer;
use api::logic::{common_model, tasks, ReadResponse};
use api::logic::{common_model, ReadResponse};
use api::logic::{connection_definition, connection_model_definition, connection_model_schema};
use entities::task::Task;
use entities::{
common_model::CommonModel, connection_definition::ConnectionDefinition,
connection_model_definition::ConnectionModelDefinition,
Expand Down
1 change: 1 addition & 0 deletions entities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ axum.workspace = true
base64.workspace = true
base64ct.workspace = true
bson.workspace = true
bytes = { version = "1.10.0", features = ["serde"] }
chrono.workspace = true
ctr = "0.9.2"
derive_builder.workspace = true
Expand Down
7 changes: 4 additions & 3 deletions entities/src/domain/event/task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{record_metadata::RecordMetadata, Id};
use http::StatusCode;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand All @@ -8,13 +8,14 @@ use serde_json::Value;
pub struct Task {
#[serde(rename = "_id")]
pub id: Id,
pub worker_id: i64,
pub start_time: i64,
pub end_time: Option<i64>,
pub payload: Value,
pub endpoint: String,
#[serde(with = "http_serde_ext_ios::status_code::option")]
pub status: Option<StatusCode>,
pub status: Option<String>,
pub r#await: bool,
pub log_trail: Vec<Bytes>,
#[serde(flatten)]
pub metadata: RecordMetadata,
}
40 changes: 34 additions & 6 deletions watchdog/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::{self, WatchdogConfig};
use crate::config::WatchdogConfig;
use bson::doc;
use cache::remote::RedisCache;
use chrono::Utc;
Expand Down Expand Up @@ -91,8 +91,9 @@ impl WatchdogClient {
.get_many(
Some(doc! {
"active": true,
"workerId": 0,
"startTime": {
"$lte": Utc::now().timestamp_millis()
"$lte": Utc::now().timestamp_millis(),
}}),
None,
None,
Expand All @@ -101,6 +102,24 @@ impl WatchdogClient {
)
.await?;

tracing::info!("Executing {} tasks", tasks.len());

self.tasks
.update_many(
doc! {
"_id": {
"$in": tasks.iter().map(|t| t.id.to_string()).collect::<Vec<_>>()
}
},
doc! {
"$set": {
"workerId": 1,
"active": false
}
},
)
.await?;

let client = self.client.clone();
let tasks_store = self.tasks.clone();
let timeout = self.watchdog.http_client_timeout_secs;
Expand All @@ -121,8 +140,6 @@ impl WatchdogClient {
}
});

tracing::info!("Executing next batch of tasks");

tokio::time::sleep(Duration::from_secs(
self.watchdog.rate_limiter_refresh_interval,
))
Expand Down Expand Up @@ -152,12 +169,23 @@ async fn execute(

let status = response.status();
let mut stream = response.bytes_stream();
let mut log_trail = vec![];

while let Some(item) = stream.next().await {
tracing::debug!("Response from API {:?}", item);
tracing::info!("Response length from API {:?}", item.map(|b| b.len()));
log_trail.push(item);
}

let log_trail = log_trail
.into_iter()
.filter_map(|x| x.ok())
.collect::<Vec<_>>();

let bson_log_trail = bson::to_bson(&log_trail).map_err(|e| {
error!("Could not convert log trail to BSON: {e}");
InternalError::io_err(e.to_string().as_str(), None)
})?;

tasks_store
.collection
.find_one_and_update(
Expand All @@ -168,7 +196,7 @@ async fn execute(
"$set": {
"status": status.to_string(),
"endTime": Utc::now().timestamp_millis(),
"active": false
"logTrail": bson_log_trail,
}
},
)
Expand Down