Skip to content

Commit

Permalink
feat: make the task client hold the http connection (#240)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Feb 6, 2025
1 parent 82d974b commit bf325eb
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 30 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 5 additions & 14 deletions api/src/logic/knowledge.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
use super::{read_without_key, HookExt, PublicExt, ReadResponse, RequestExt};
use crate::{
router::ServerResponse,
server::{AppState, AppStores},
};
use axum::{
extract::{Query, State},
routing::get,
Json, Router,
};
use super::{read_without_key, HookExt, PublicExt, RequestExt};
use crate::server::{AppState, AppStores};
use axum::{routing::get, Router};
use bson::doc;
use entities::{record_metadata::RecordMetadata, Id, MongoStore, PicaError};
use entities::{record_metadata::RecordMetadata, Id, MongoStore};
use fake::Dummy;
use http::HeaderMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::BTreeMap, sync::Arc};
use std::sync::Arc;

pub fn get_router() -> Router<Arc<AppState>> {
Router::new().route("/", get(read_without_key::<ReadRequest, Knowledge>))
Expand Down
3 changes: 3 additions & 0 deletions api/src/logic/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct CreateRequest {
pub start_time: i64,
pub endpoint: String,
pub payload: Value,
#[serde(rename = "await")]
pub r#await: bool,
}

impl RequestExt for CreateRequest {
Expand All @@ -44,6 +46,7 @@ impl RequestExt for CreateRequest {
payload: self.payload.clone(),
endpoint: self.endpoint.clone(),
status: None,
r#await: self.r#await,
metadata: RecordMetadata::default(),
})
}
Expand Down
20 changes: 14 additions & 6 deletions api/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ impl TestServer {
// Get available port for server to listen
let port = TcpListener::bind("127.0.0.1:0")
.await
.unwrap()
.expect("Could not bind to port")
.local_addr()
.unwrap()
.expect("Could not get local address")
.port();

// Random database name
Expand Down Expand Up @@ -159,7 +159,7 @@ impl TestServer {
"ios-kms".to_string(),
),
]))
.unwrap();
.expect("Could not create envconfig");

let secrets_client = Arc::new(MockSecretsClient);

Expand All @@ -178,10 +178,14 @@ impl TestServer {
let iv = rand::thread_rng().gen::<[u8; 16]>();
let live_encrypted_key = live_access_key
.encode(
&config.event_access_password.as_bytes().try_into().unwrap(),
&config
.event_access_password
.as_bytes()
.try_into()
.expect("Could not convert to bytes"),
&iv,
)
.unwrap();
.expect("Could not encode access key");

let prefix = AccessKeyPrefix {
environment: Environment::Test,
Expand All @@ -191,7 +195,11 @@ impl TestServer {
let test_access_key = AccessKey { prefix, data };
let test_encrypted_key = test_access_key
.encode(
&config.event_access_password.as_bytes().try_into().unwrap(),
&config
.event_access_password
.as_bytes()
.try_into()
.expect("Could not convert to array"),
&iv,
)
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions entities/src/domain/event/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Task {
pub endpoint: String,
#[serde(with = "http_serde_ext_ios::status_code::option")]
pub status: Option<StatusCode>,
pub r#await: bool,
#[serde(flatten)]
pub metadata: RecordMetadata,
}
7 changes: 6 additions & 1 deletion watchdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ envconfig.workspace = true
futures.workspace = true
cache = { path = "../cache" }
entities = { path = "../entities" }
reqwest.workspace = true
reqwest = { workspace = true, features = ["stream"] }
serde_json.workspace = true
mongodb.workspace = true
redis.workspace = true
tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
testcontainers-modules = { workspace = true, features = ["mongo", "redis"] }
mockito.workspace = true
tracing-subscriber.workspace = true
uuid.workspace = true
31 changes: 23 additions & 8 deletions watchdog/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::WatchdogConfig;
use crate::config::{self, WatchdogConfig};
use bson::doc;
use cache::remote::RedisCache;
use chrono::Utc;
Expand Down Expand Up @@ -39,9 +39,7 @@ impl WatchdogClient {
cache: CacheConfig,
database: DatabaseConfig,
) -> Result<Self, PicaError> {
let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(watchdog.http_client_timeout_secs))
.build()?;
let http_client = reqwest::ClientBuilder::new().build()?;
let client = mongodb::Client::with_uri_str(&database.event_db_url).await?;
let db = client.database(&database.event_db_name);

Expand Down Expand Up @@ -105,11 +103,12 @@ impl WatchdogClient {

let client = self.client.clone();
let tasks_store = self.tasks.clone();
let timeout = self.watchdog.http_client_timeout_secs;

tokio::spawn(async move {
let mut tasks = tasks
.into_iter()
.map(|task| execute(task, client.clone(), tasks_store.clone()))
.map(|task| execute(task, client.clone(), tasks_store.clone(), timeout))
.collect::<FuturesUnordered<_>>();

while let Some(result) = tasks.next().await {
Expand All @@ -136,22 +135,38 @@ async fn execute(
task: Task,
http_client: reqwest::Client,
tasks_store: MongoStore<Task>,
timeout: u64,
) -> Result<Id, PicaError> {
let request = http_client
let timeout = if task.r#await {
Duration::from_secs(300)
} else {
Duration::from_secs(timeout)
};

let response = http_client
.post(task.endpoint)
.timeout(timeout)
.json(&task.payload)
.send()
.await?;

let status = response.status();
let mut stream = response.bytes_stream();

while let Some(item) = stream.next().await {
tracing::debug!("Response from API {:?}", item);
tracing::info!("Response length from API {:?}", item.map(|b| b.len()));
}

tasks_store
.collection
.find_one_and_update(
doc! {
"_id": task.id.to_string() // Filter by task ID
},
doc! {
"$set": { // Use the $set operator correctly
"status": request.status().to_string(),
"$set": {
"status": status.to_string(),
"endTime": Utc::now().timestamp_millis(),
"active": false
}
Expand Down
2 changes: 1 addition & 1 deletion watchdog/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt::{Display, Formatter};

#[derive(Envconfig, Clone)] // Intentionally no Debug so secret is not printed
pub struct WatchdogConfig {
#[envconfig(from = "RATE_LIMITER_REFRESH_INTERVAL", default = "60")]
#[envconfig(from = "RATE_LIMITER_REFRESH_INTERVAL", default = "10")]
pub rate_limiter_refresh_interval: u64,
#[envconfig(from = "HTTP_CLIENT_TIMEOUT_SECS", default = "10")]
pub http_client_timeout_secs: u64,
Expand Down

0 comments on commit bf325eb

Please sign in to comment.