Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Parseable as the main interface #1143

Merged
merged 48 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
97eff97
refactor: encapsulate `create_stream_and_schema_from_storage`
de-sh Jan 29, 2025
ed141a2
refactor: `Parseable` holds state of the server
de-sh Jan 29, 2025
f697da8
ci: deepsource
de-sh Jan 29, 2025
afca4ba
Merge remote-tracking branch 'origin/main' into ingest
de-sh Jan 29, 2025
0399a49
Merge remote-tracking branch 'origin/main' into ingest
de-sh Feb 1, 2025
9d8e530
get rid of pesky test
de-sh Feb 1, 2025
879d3bf
refactor: for readability
de-sh Feb 1, 2025
72c8211
doc: `Parseable`
de-sh Feb 1, 2025
1a302f6
refactor: list and len
de-sh Feb 1, 2025
ef4eb93
refactor improve readability of stream meta setup
de-sh Feb 1, 2025
10e6a3a
saved from double put to storage
de-sh Feb 1, 2025
b90f700
refactor: review suggestion from @nitisht
de-sh Feb 1, 2025
78c83d9
doc: add license header
de-sh Feb 1, 2025
d41de7f
don't clone
de-sh Feb 1, 2025
7907c02
refactor: improve cyclomatic complexity
de-sh Feb 1, 2025
ff0e424
refactor: encapsultate ingestor utils
de-sh Feb 1, 2025
86d719f
refactor: `custom_partition: Option<String>`
de-sh Feb 1, 2025
f3f8f04
perf: `update_custom_partition_in_stream`
de-sh Feb 1, 2025
0da1d79
refactor: `fn name()`
de-sh Feb 2, 2025
d4fa8fe
fix: windows build
de-sh Feb 2, 2025
414e782
fix: check if static schema flag is set
de-sh Feb 2, 2025
b076e4a
Merge remote-tracking branch 'origin/main' into ingest
de-sh Feb 3, 2025
9ea948f
delete unused code
de-sh Feb 3, 2025
16aeece
Merge branch 'main' into ingest
de-sh Feb 4, 2025
3de3de3
refactor: Manage staging with `Stream`
de-sh Feb 4, 2025
27fcc47
refactor: rationalize and use Arc instead of lifetimes
de-sh Feb 4, 2025
54037d0
refactor: props as method
de-sh Feb 4, 2025
0412c95
propagate stream management code up to `Parseable`
de-sh Feb 4, 2025
54c5101
DRY: hashing for id
de-sh Feb 4, 2025
8abbbd0
refactor: shared ingestor meta
de-sh Feb 4, 2025
b5831e9
Merge remote-tracking branch 'origin/main' into ingest
de-sh Feb 5, 2025
80aee67
Merge remote-tracking branch 'origin/main' into ingest
de-sh Feb 6, 2025
e489692
refactor: `Parseable.store_ingestor_metadata()`
de-sh Feb 6, 2025
612f104
doc: `flush_all_streams`
de-sh Feb 6, 2025
71cd7cc
refactor: merge if-blocks
de-sh Feb 6, 2025
9b11e78
`delete_stream` -> `streams.delete`
de-sh Feb 6, 2025
cee7222
`schema()` -> `get_schema()`
de-sh Feb 6, 2025
e644335
`stream_type` -> `get_stream_type`
de-sh Feb 6, 2025
1ebc843
rm unused method
de-sh Feb 6, 2025
49d2308
refactor: get stream handle first and then metadata
de-sh Feb 6, 2025
ef53501
remove unsused method that leaked in during merge
de-sh Feb 6, 2025
4cd49b7
get rid of `stream_exists`
de-sh Feb 6, 2025
bca8f4a
undo merge change that breaks expectation
de-sh Feb 6, 2025
12a9ad5
refactor: move `set_metadata`
de-sh Feb 7, 2025
06c0009
associate `create_internal_stream_if_not_exists` with `Parseable`
de-sh Feb 7, 2025
614af32
Merge branch 'main' into ingest
de-sh Feb 7, 2025
ad92bfd
Merge remote-tracking branch 'origin/main' into ingest
de-sh Feb 10, 2025
88a03c2
Merge remote-tracking branch 'origin/main' into ingest
de-sh Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ key.pem
helm-releases/.DS_Store
.DS_Store
env-file
parseable
parseable/*
parseable_*
parseable-env-secret
cache
Expand Down
17 changes: 9 additions & 8 deletions src/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*
*/

use crate::analytics;
use crate::option::Config;
use crate::storage::StorageMetadata;
use crate::utils::update::{self, LatestRelease};
use chrono::Duration;
use chrono_humanize::{Accuracy, Tense};
use crossterm::style::Stylize;
Expand All @@ -30,6 +26,11 @@ use std::path::Path;
use sysinfo::System;
use ulid::Ulid;

use crate::analytics;
use crate::cli::Options;
use crate::storage::StorageMetadata;
use crate::utils::update::{self, LatestRelease};

// Expose some static variables for internal usage
pub static LATEST_RELEASE: OnceCell<Option<LatestRelease>> = OnceCell::new();

Expand Down Expand Up @@ -99,7 +100,7 @@ impl ParseableVersion {

pub fn print_about(
current_version: semver::Version,
latest_release: Option<update::LatestRelease>,
latest_release: Option<LatestRelease>,
commit_hash: String,
) {
eprint!(
Expand All @@ -123,7 +124,7 @@ pub fn print_about(
);
}

fn print_latest_release(latest_release: update::LatestRelease) {
fn print_latest_release(latest_release: LatestRelease) {
let time_since_latest_release = chrono::Utc::now() - latest_release.date;
let time_since_latest_release = humanize_time(time_since_latest_release);
let fmt_latest_version = format!(
Expand All @@ -133,10 +134,10 @@ fn print_latest_release(latest_release: update::LatestRelease) {
eprint!("{}", fmt_latest_version.red());
}

pub async fn print(config: &Config, meta: &StorageMetadata) {
pub async fn print(options: &Options, meta: &StorageMetadata) {
// print current version
let current = current();
let latest_release = if config.options.check_update {
let latest_release = if options.check_update {
update::get_latest(&meta.deployment_id).await.ok()
} else {
None
Expand Down
10 changes: 5 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use ulid::Ulid;
pub mod alerts_utils;
pub mod target;

use crate::option::CONFIG;
use crate::parseable::PARSEABLE;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::map::SessionKey;
use crate::storage;
Expand Down Expand Up @@ -650,8 +650,8 @@ impl AlertConfig {
fn get_context(&self) -> Context {
let deployment_instance = format!(
"{}://{}",
CONFIG.options.get_scheme(),
CONFIG.options.address
PARSEABLE.options.get_scheme(),
PARSEABLE.options.address
);
let deployment_id = storage::StorageMetadata::global().deployment_id;
let deployment_mode = storage::StorageMetadata::global().mode.to_string();
Expand Down Expand Up @@ -730,7 +730,7 @@ impl Alerts {
/// Loads alerts from disk, blocks
pub async fn load(&self) -> Result<(), AlertError> {
let mut map = self.alerts.write().await;
let store = CONFIG.storage().get_object_store();
let store = PARSEABLE.storage.get_object_store();

for alert in store.get_alerts().await.unwrap_or_default() {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
Expand Down Expand Up @@ -792,7 +792,7 @@ impl Alerts {
new_state: AlertState,
trigger_notif: Option<String>,
) -> Result<(), AlertError> {
let store = CONFIG.storage().get_object_store();
let store = PARSEABLE.storage.get_object_store();

// read and modify alert
let mut alert = self.get_alert_by_id(alert_id).await?;
Expand Down
35 changes: 20 additions & 15 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*
*
*/

use crate::about::{current, platform};
use crate::handlers::http::cluster::utils::check_liveness;
use crate::handlers::http::{base_path_without_preceding_slash, cluster};
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::option::{Mode, CONFIG};
use crate::{metadata, stats};
use crate::{storage, HTTP_CLIENT};

use crate::stats::Stats;
use actix_web::{web, HttpRequest, Responder};
use chrono::{DateTime, Utc};
use clokwerk::{AsyncScheduler, Interval};
Expand All @@ -40,6 +30,21 @@ use sysinfo::System;
use tracing::{error, info};
use ulid::Ulid;

use crate::{
about::{current, platform},
handlers::{
http::{
base_path_without_preceding_slash,
cluster::{self, utils::check_liveness},
},
STREAM_NAME_HEADER_KEY,
},
option::Mode,
parseable::PARSEABLE,
stats::{self, Stats},
storage, HTTP_CLIENT,
};

const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
const ANALYTICS_SEND_INTERVAL_SECONDS: Interval = clokwerk::Interval::Hours(1);

Expand Down Expand Up @@ -111,8 +116,8 @@ impl Report {
cpu_count,
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.options.mode,
storage_mode: PARSEABLE.get_storage_mode_string().to_string(),
server_mode: PARSEABLE.options.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down Expand Up @@ -148,7 +153,7 @@ pub async fn get_analytics(_: HttpRequest) -> impl Responder {
}

fn total_streams() -> usize {
metadata::STREAM_INFO.list_streams().len()
PARSEABLE.streams.len()
}

fn total_event_stats() -> (Stats, Stats, Stats) {
Expand All @@ -164,7 +169,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) {
let mut deleted_parquet_bytes: u64 = 0;
let mut deleted_json_bytes: u64 = 0;

for stream in metadata::STREAM_INFO.list_streams() {
for stream in PARSEABLE.streams.list() {
let Some(stats) = stats::get_current_stats(&stream, "json") else {
continue;
};
Expand Down Expand Up @@ -219,7 +224,7 @@ async fn fetch_ingestors_metrics(
let mut vec = vec![];
let mut active_ingestors = 0u64;
let mut offline_ingestors = 0u64;
if CONFIG.options.mode == Mode::Query {
if PARSEABLE.options.mode == Mode::Query {
// send analytics for ingest servers

// ingestor infos should be valid here, if not some thing is wrong
Expand Down
14 changes: 9 additions & 5 deletions src/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ use std::{
fmt::{Debug, Display},
};

use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT};
use crate::{about::current, parseable::PARSEABLE, storage::StorageMetadata, HTTP_CLIENT};

use super::option::CONFIG;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use serde::Serialize;
Expand All @@ -47,7 +46,12 @@ impl AuditLogger {
// Try to construct the log endpoint URL by joining the base URL
// with the ingest path, This can fail if the URL is not valid,
// when the base URL is not set or the ingest path is not valid
let log_endpoint = match CONFIG.options.audit_logger.as_ref()?.join("/api/v1/ingest") {
let log_endpoint = match PARSEABLE
.options
.audit_logger
.as_ref()?
.join("/api/v1/ingest")
{
Ok(url) => url,
Err(err) => {
eprintln!("Couldn't setup audit logger: {err}");
Expand All @@ -66,8 +70,8 @@ impl AuditLogger {
.header("x-p-stream", "audit_log");

// Use basic auth if credentials are configured
if let Some(username) = CONFIG.options.audit_username.as_ref() {
req = req.basic_auth(username, CONFIG.options.audit_password.as_ref())
if let Some(username) = PARSEABLE.options.audit_username.as_ref() {
req = req.basic_auth(username, PARSEABLE.options.audit_password.as_ref())
}

match req.send().await {
Expand Down
12 changes: 6 additions & 6 deletions src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use crossterm::style::Stylize;

use crate::about;
use crate::utils::uid::Uid;
use crate::{option::Config, storage::StorageMetadata};
use crate::{parseable::Parseable, storage::StorageMetadata};

pub async fn print(config: &Config, meta: &StorageMetadata) {
pub async fn print(config: &Parseable, meta: &StorageMetadata) {
print_ascii_art();
let scheme = config.options.get_scheme();
status_info(config, &scheme, meta.deployment_id);
storage_info(config).await;
about::print(config, meta).await;
about::print(&config.options, meta).await;
println!();
}

Expand All @@ -46,7 +46,7 @@ fn print_ascii_art() {
eprint!("{ascii_name}");
}

fn status_info(config: &Config, scheme: &str, id: Uid) {
fn status_info(config: &Parseable, scheme: &str, id: Uid) {
let address = format!(
"\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)",
scheme,
Expand All @@ -59,7 +59,7 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
let mut credentials =
String::from("\"As set in P_USERNAME and P_PASSWORD environment variables\"");

if config.is_default_creds() {
if config.options.is_default_creds() {
credentials = "\"Using default creds admin, admin. Please set credentials with P_USERNAME and P_PASSWORD.\"".red().to_string();
}

Expand Down Expand Up @@ -93,7 +93,7 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
/// - Mode (`Local drive`, `S3 bucket`)
/// - Staging (temporary landing point for incoming events)
/// - Store (path where the data is stored and its latency)
async fn storage_info(config: &Config) {
async fn storage_info(config: &Parseable) {
let storage = config.storage();
let latency = storage.get_object_store().get_latency().await;

Expand Down
61 changes: 31 additions & 30 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@

use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers;
use crate::handlers::http::base_path_without_preceding_slash;
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::{Mode, CONFIG};
use crate::stats::{
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
};
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
query::PartialTimeFilter,
storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError},
};
use chrono::{DateTime, Local, NaiveTime, Utc};
use column::Column;
use manifest::Manifest;
use relative_path::RelativePathBuf;
use snapshot::ManifestItem;
use std::io::Error as IOError;
use tracing::{error, info};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
handlers::{self, http::base_path_without_preceding_slash},
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
option::Mode,
parseable::PARSEABLE,
query::PartialTimeFilter,
stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats},
storage::{
object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat,
},
};
pub use manifest::create_from_parquet_file;

pub mod column;
pub mod manifest;
pub mod snapshot;
use crate::storage::ObjectStoreFormat;
pub use manifest::create_from_parquet_file;
pub trait Snapshot {
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem>;
}
Expand Down Expand Up @@ -263,7 +264,7 @@ async fn create_manifest(
files: vec![change],
..Manifest::default()
};
let mut first_event_at = STREAM_INFO.get_first_event(stream_name)?;
let mut first_event_at = PARSEABLE.get_stream(stream_name)?.get_first_event();
if first_event_at.is_none() {
if let Some(first_event) = manifest.files.first() {
let time_partition = &meta.time_partition;
Expand All @@ -279,13 +280,11 @@ async fn create_manifest(
}
};
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
if let Err(err) =
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
match PARSEABLE.get_stream(stream_name) {
Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()),
Err(err) => error!(
"Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}"
),
}
}
}
Expand Down Expand Up @@ -332,11 +331,11 @@ pub async fn remove_manifest_from_snapshot(
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
STREAM_INFO.reset_first_event_at(stream_name)?;
PARSEABLE.get_stream(stream_name)?.reset_first_event_at();
meta.first_event_at = None;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
match CONFIG.options.mode {
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Expand All @@ -350,10 +349,10 @@ pub async fn get_first_event(
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match CONFIG.options.mode {
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = STREAM_INFO.get_first_event(stream_name)?;
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
if stream_first_event.is_some() {
first_event_at = stream_first_event.unwrap();
} else {
Expand Down Expand Up @@ -393,7 +392,9 @@ pub async fn get_first_event(
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
PARSEABLE
.get_stream(stream_name)?
.set_first_event_at(&first_event_at);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,8 @@ impl Options {
origin,
})
}

pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}
}
Loading
Loading