Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Feb 10, 2025
2 parents 614af32 + 6346928 commit ad92bfd
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 194 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:

docker-compose-test:
name: Quest Smoke and Load Tests for Standalone deployments
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -23,7 +23,7 @@ jobs:

docker-compose-distributed-test:
name: Quest Smoke and Load Tests for Distributed deployments
runs-on: ubuntu-latest
runs-on: self-hosted
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions docker-compose-distributed-test-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ services:
quest:
platform: linux/amd64
image: ghcr.io/parseablehq/quest:main
pull_policy: always
command:
[
"load",
Expand Down
1 change: 1 addition & 0 deletions docker-compose-distributed-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ services:
quest:
platform: linux/amd64
image: ghcr.io/parseablehq/quest:main
pull_policy: always
command:
[
"load",
Expand Down
3 changes: 2 additions & 1 deletion docker-compose-test-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ networks:

services:
minio:
image: minio/minio:RELEASE.2023-02-10T18-48-39Z
image: minio/minio:RELEASE.2025-02-03T21-03-04Z
entrypoint:
- sh
- -euc
Expand Down Expand Up @@ -67,6 +67,7 @@ services:
quest:
image: ghcr.io/parseablehq/quest:main
platform: linux/amd64
pull_policy: always
command: [
"load",
"http://parseable:8000",
Expand Down
3 changes: 2 additions & 1 deletion docker-compose-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ networks:

services:
minio:
image: minio/minio:RELEASE.2023-02-10T18-48-39Z
image: minio/minio:RELEASE.2025-02-03T21-03-04Z
entrypoint:
- sh
- -euc
Expand Down Expand Up @@ -60,6 +60,7 @@ services:

quest:
image: ghcr.io/parseablehq/quest:main
pull_policy: always
platform: linux/amd64
command: [
"load",
Expand Down
17 changes: 12 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use once_cell::sync::Lazy;
use serde_json::Error as SerdeError;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{trace, warn};
Expand Down Expand Up @@ -733,10 +733,17 @@ impl Alerts {
let store = PARSEABLE.storage.get_object_store();

for alert in store.get_alerts().await.unwrap_or_default() {
let (handle, rx, tx) =
schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;

self.update_task(alert.id, handle, rx, tx).await;
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let handle = schedule_alert_task(
alert.get_eval_frequency(),
alert.clone(),
inbox_rx,
outbox_tx,
)?;

self.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;

map.insert(alert.id, alert);
}
Expand Down
27 changes: 23 additions & 4 deletions src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use actix_web::{
HttpRequest, Responder,
};
use bytes::Bytes;
use tokio::sync::oneshot;
use ulid::Ulid;

use crate::alerts::{
Expand Down Expand Up @@ -55,7 +56,14 @@ pub async fn post(
user_auth_for_query(&session_key, &alert.query).await?;

// create scheduled tasks
let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let handle = schedule_alert_task(
alert.get_eval_frequency(),
alert.clone(),
inbox_rx,
outbox_tx,
)?;

// now that we've validated that the user can run this query
// move on to saving the alert in ObjectStore
Expand All @@ -67,7 +75,9 @@ pub async fn post(
let alert_bytes = serde_json::to_vec(&alert)?;
store.put_object(&path, Bytes::from(alert_bytes)).await?;

ALERTS.update_task(alert.id, handle, rx, tx).await;
ALERTS
.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;

Ok(web::Json(alert))
}
Expand Down Expand Up @@ -136,7 +146,14 @@ pub async fn modify(
alert.validate().await?;

// modify task
let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
let handle = schedule_alert_task(
alert.get_eval_frequency(),
alert.clone(),
inbox_rx,
outbox_tx,
)?;

// modify on disk
PARSEABLE
Expand All @@ -148,7 +165,9 @@ pub async fn modify(
// modify in memory
ALERTS.update(&alert).await;

ALERTS.update_task(alert.id, handle, rx, tx).await;
ALERTS
.update_task(alert.id, handle, outbox_rx, inbox_tx)
.await;

Ok(web::Json(alert))
}
Expand Down
61 changes: 9 additions & 52 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::thread;

use actix_web::web;
use actix_web::Scope;
use actix_web_prometheus::PrometheusMetrics;
Expand All @@ -25,7 +27,6 @@ use bytes::Bytes;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::oneshot;
use tracing::error;

use crate::{
analytics,
Expand Down Expand Up @@ -100,65 +101,21 @@ impl ParseableServer for IngestServer {

migration::run_migration(&PARSEABLE).await?;

let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;
let (
mut remote_conversion_handler,
mut remote_conversion_outbox,
mut remote_conversion_inbox,
) = sync::arrow_conversion().await;
// Run sync on a background thread
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

tokio::spawn(airplane::server());

// write the ingestor metadata to storage
PARSEABLE.store_ingestor_metadata().await?;

// Ingestors shouldn't have to deal with OpenId auth flow
let app = self.start(shutdown_rx, prometheus.clone(), None);

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
}
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

}
}
result
}
}

Expand Down
47 changes: 11 additions & 36 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::thread;

use crate::alerts::ALERTS;
use crate::correlation::CORRELATIONS;
use crate::handlers::airplane;
Expand All @@ -26,10 +28,9 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::handlers::http::{rbac, role};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use crate::{analytics, migration, storage};
use crate::{analytics, migration, storage, sync};
use actix_web::web::{resource, ServiceConfig};
use actix_web::{web, Scope};
use actix_web_prometheus::PrometheusMetrics;
Expand Down Expand Up @@ -132,44 +133,18 @@ impl ParseableServer for QueryServer {
hot_tier_manager.put_internal_stream_hot_tier().await?;
hot_tier_manager.download_from_s3()?;
};
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
sync::run_local_sync().await;
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
sync::object_store_sync().await;

// Run sync on a background thread
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

tokio::spawn(airplane::server());
let app = self.start(shutdown_rx, prometheus.clone(), PARSEABLE.options.openid());

tokio::pin!(app);
loop {
tokio::select! {
e = &mut app => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining localsync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
return e
},
_ = &mut localsync_outbox => {
// crash the server if localsync fails for any reason
// panic!("Local Sync thread died. Server will fail now!")
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
},
_ = &mut remote_sync_outbox => {
// remote_sync failed, this is recoverable by just starting remote_sync thread again
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
}
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");

};
}
result
}
}

Expand Down
Loading

0 comments on commit ad92bfd

Please sign in to comment.