diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2fba1d5b5..10da8c1b0 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -761,16 +761,14 @@ impl Parseable { .await { error!( - "Failed to update first_event_at in storage for stream {:?}: {err:?}", - stream_name + "Failed to update first_event_at in storage for stream {stream_name:?}: {err:?}" ); } match self.get_stream(stream_name) { Ok(stream) => stream.set_first_event_at(first_event_at), Err(err) => error!( - "Failed to update first_event_at in stream info for stream {:?}: {err:?}", - stream_name + "Failed to update first_event_at in stream info for stream {stream_name:?}: {err:?}" ), } diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index b34e1e6d4..bec8c4a46 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -15,40 +15,46 @@ * along with this program. If not, see . * */ -use super::object_storage::parseable_json_path; -use super::{ - ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + +use std::{ + collections::{BTreeMap, HashSet}, + path::Path, + sync::Arc, + time::{Duration, Instant}, }; + use async_trait::async_trait; use bytes::Bytes; -use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +use datafusion::{ + datasource::listing::ListingTableUrl, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, + runtime_env::RuntimeEnvBuilder, + }, +}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use object_store::{ + azure::{MicrosoftAzure, MicrosoftAzureBuilder}, + limit::LimitStore, + path::Path as StorePath, + BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig, }; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryStreamExt}; -use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; -use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; -use std::path::Path as StdPath; use tracing::{error, info}; use url::Url; -use super::metrics_layer::MetricLayer; -use crate::handlers::http::users::USERS_ROOT_DIR; -use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME; -use crate::metrics::storage::StorageMetrics; -use crate::parseable::LogStream; -use object_store::limit::LimitStore; -use object_store::path::Path as StorePath; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use crate::{ + handlers::http::users::USERS_ROOT_DIR, + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + parseable::LogStream, +}; -const CONNECT_TIMEOUT_SECS: u64 = 5; -const REQUEST_TIMEOUT_SECS: u64 = 300; +use super::{ + metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, + PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, + STREAM_ROOT_DIRECTORY, +}; #[derive(Debug, Clone, clap::Args)] #[command( @@ -161,7 +167,7 @@ impl ObjectStorageProvider for AzureBlobConfig { let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); let azure = MetricLayer::new(azure); - let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); + let object_store_registry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account)) .unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(azure)); @@ -190,10 +196,6 @@ impl ObjectStorageProvider for AzureBlobConfig { } } -pub fn to_object_store_path(path: &RelativePath) -> StorePath { - StorePath::from(path.as_str()) -} - // ObjStoreClient is generic client to enable interactions with different cloudprovider's // object store such as S3 and Azure Blob #[derive(Debug)] @@ -347,7 +349,7 @@ impl BlobStore { } Ok(result_file_list) } - async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let instant = Instant::now(); // // TODO: Uncomment this when multipart is fixed @@ -376,7 +378,7 @@ impl BlobStore { } // TODO: introduce parallel, multipart-uploads if required - // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; // let mut file = OpenOptions::new().read(true).open(path).await?; @@ -623,7 +625,7 @@ impl ObjectStorage for BlobStore { Ok(files) } - async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { self._upload_file(key, path).await?; Ok(()) @@ -663,126 +665,21 @@ impl ObjectStorage for BlobStore { .collect::>()) } - async fn get_all_dashboards( - &self, - ) -> Result>, ObjectStorageError> { - let mut dashboards: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_dashboard_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); - let dashboards_path = RelativePathBuf::from(&user_dashboard_path); - let dashboard_bytes = self - .get_objects( - Some(&dashboards_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - dashboards - .entry(dashboards_path) - .or_default() - .extend(dashboard_bytes); - } - Ok(dashboards) - } - - async fn get_all_saved_filters( + async fn list_dirs_relative( &self, - ) -> Result>, ObjectStorageError> { - let mut filters: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { + let prefix = object_store::path::Path::from(relative_path.as_str()); + let resp = self.client.list_with_delimiter(Some(&prefix)).await?; - let users = resp + Ok(resp .common_prefixes .iter() .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_filters_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); - let resp = self - .client - .list_with_delimiter(Some(&user_filters_path)) - .await?; - let streams = resp - .common_prefixes - .iter() - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for stream in streams { - let filters_path = RelativePathBuf::from(&stream); - let filter_bytes = self - .get_objects( - Some(&filters_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - filters - .entry(filters_path) - .or_default() - .extend(filter_bytes); - } - } - Ok(filters) + .collect::>()) } - ///fetch all correlations uploaded in object store - /// return the correlation file path and all correlation json bytes for each file path - async fn get_all_correlations( - &self, - ) -> Result>, ObjectStorageError> { - let mut correlations: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_correlation_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations")); - let correlations_path = RelativePathBuf::from(&user_correlation_path); - let correlation_bytes = self - .get_objects( - Some(&correlations_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - correlations - .entry(correlations_path) - .or_default() - .extend(correlation_bytes); - } - Ok(correlations) - } fn get_bucket_name(&self) -> String { self.container.clone() } diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 0e0d092c2..59d06d264 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashSet}, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -27,16 +27,17 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{stream::FuturesUnordered, TryStreamExt}; use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; use crate::{ handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}, + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + option::validation, + parseable::LogStream, }; -use crate::{option::validation, parseable::LogStream}; use super::{ ObjectStorage, ObjectStorageError, ObjectStorageProvider, ALERTS_ROOT_DIRECTORY, @@ -357,106 +358,25 @@ impl ObjectStorage for LocalFS { Ok(dirs) } - async fn get_all_dashboards( + async fn list_dirs_relative( &self, - ) -> Result>, ObjectStorageError> { - let mut dashboards: HashMap> = HashMap::new(); - let users_root_path = self.root.join(USERS_ROOT_DIR); - let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - let users: Vec = directories.try_collect().await?; - for user in users { - if !user.path().is_dir() { - continue; - } - let dashboards_path = users_root_path.join(user.path()).join("dashboards"); - let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?); - let dashboards_files: Vec = directories.try_collect().await?; - for dashboard in dashboards_files { - let dashboard_absolute_path = dashboard.path(); - let file = fs::read(dashboard_absolute_path.clone()).await?; - let dashboard_relative_path = dashboard_absolute_path - .strip_prefix(self.root.as_path()) - .unwrap(); - - dashboards - .entry(RelativePathBuf::from_path(dashboard_relative_path).unwrap()) - .or_default() - .push(file.into()); - } - } - Ok(dashboards) - } + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { + let root = self.root.join(relative_path.as_str()); + let dirs = ReadDirStream::new(fs::read_dir(root).await?) + .try_collect::>() + .await? + .into_iter() + .map(dir_name); - async fn get_all_saved_filters( - &self, - ) -> Result>, ObjectStorageError> { - let mut filters: HashMap> = HashMap::new(); - let users_root_path = self.root.join(USERS_ROOT_DIR); - let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - let users: Vec = directories.try_collect().await?; - for user in users { - if !user.path().is_dir() { - continue; - } - let stream_root_path = users_root_path.join(user.path()).join("filters"); - let directories = ReadDirStream::new(fs::read_dir(&stream_root_path).await?); - let streams: Vec = directories.try_collect().await?; - for stream in streams { - if !stream.path().is_dir() { - continue; - } - let filters_path = users_root_path - .join(user.path()) - .join("filters") - .join(stream.path()); - let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?); - let filters_files: Vec = directories.try_collect().await?; - for filter in filters_files { - let filter_absolute_path = filter.path(); - let file = fs::read(filter_absolute_path.clone()).await?; - let filter_relative_path = filter_absolute_path - .strip_prefix(self.root.as_path()) - .unwrap(); - - filters - .entry(RelativePathBuf::from_path(filter_relative_path).unwrap()) - .or_default() - .push(file.into()); - } - } - } - Ok(filters) - } + let dirs = FuturesUnordered::from_iter(dirs) + .try_collect::>() + .await? + .into_iter() + .flatten() + .collect::>(); - ///fetch all correlations stored in disk - /// return the correlation file path and all correlation json bytes for each file path - async fn get_all_correlations( - &self, - ) -> Result>, ObjectStorageError> { - let mut correlations: HashMap> = HashMap::new(); - let users_root_path = self.root.join(USERS_ROOT_DIR); - let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - while let Some(user) = directories.next().await { - let user = user?; - if !user.path().is_dir() { - continue; - } - let correlations_path = users_root_path.join(user.path()).join("correlations"); - let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?); - while let Some(correlation) = files.next().await { - let correlation_absolute_path = correlation?.path(); - let file = fs::read(correlation_absolute_path.clone()).await?; - let correlation_relative_path = correlation_absolute_path - .strip_prefix(self.root.as_path()) - .unwrap(); - - correlations - .entry(RelativePathBuf::from_path(correlation_relative_path).unwrap()) - .or_default() - .push(file.into()); - } - } - Ok(correlations) + Ok(dirs) } async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e02094584..3be5bfc37 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -16,6 +16,11 @@ * */ +use chrono::Local; +use object_store::path::Path; +use relative_path::RelativePath; +use serde::{Deserialize, Serialize}; + use crate::{ catalog::snapshot::Snapshot, event::format::LogSource, @@ -26,15 +31,10 @@ use crate::{ utils::json::{deserialize_string_as_true, serialize_bool_as_true}, }; -use chrono::Local; -use serde::{Deserialize, Serialize}; - -use std::fmt::Debug; - mod azure_blob; mod localfs; mod metrics_layer; -pub(crate) mod object_storage; +pub mod object_storage; pub mod retention; mod s3; mod store_metadata; @@ -78,6 +78,9 @@ const ACCESS_ALL: &str = "all"; pub const CURRENT_OBJECT_STORE_VERSION: &str = "v5"; pub const CURRENT_SCHEMA_VERSION: &str = "v5"; +const CONNECT_TIMEOUT_SECS: u64 = 5; +const REQUEST_TIMEOUT_SECS: u64 = 300; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { /// Version of schema registry @@ -260,3 +263,7 @@ pub enum ObjectStorageError { #[error("{0}")] StandaloneWithDistributed(#[from] StandaloneWithDistributed), } + +pub fn to_object_store_path(path: &RelativePath) -> Path { + Path::from(path.as_str()) +} diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index bc022ecd1..a6ba5558e 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,25 +16,16 @@ * */ -use super::{ - retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata, - ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, -}; - -use crate::alerts::AlertConfig; -use crate::correlation::{CorrelationConfig, CorrelationError}; -use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; -use crate::handlers::http::users::{CORRELATION_DIR, DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; -use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; -use crate::option::Mode; -use crate::parseable::LogStream; -use crate::{ - catalog::{self, manifest::Manifest, snapshot::Snapshot}, - metrics::{storage::StorageMetrics, STORAGE_SIZE}, - parseable::PARSEABLE, - stats::FullStats, -}; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::HashSet; +use std::fmt::Debug; +use std::fs::{remove_file, File}; +use std::num::NonZeroU32; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use actix_web_prometheus::PrometheusMetrics; use arrow_schema::Schema; @@ -48,16 +39,23 @@ use relative_path::RelativePathBuf; use tracing::{debug, error, warn}; use ulid::Ulid; -use std::collections::{BTreeMap, HashSet}; -use std::fmt::Debug; -use std::fs::File; -use std::num::NonZeroU32; -use std::{ - collections::HashMap, - fs, - path::Path, - sync::Arc, - time::{Duration, Instant}, +use crate::alerts::AlertConfig; +use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; +use crate::correlation::{CorrelationConfig, CorrelationError}; +use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; +use crate::handlers::http::users::CORRELATION_DIR; +use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::metrics::storage::StorageMetrics; +use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; +use crate::option::Mode; +use crate::parseable::LogStream; +use crate::parseable::PARSEABLE; +use crate::stats::FullStats; + +use super::{ + retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata, + ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { @@ -93,15 +91,87 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn list_streams(&self) -> Result, ObjectStorageError>; async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; + async fn list_dirs_relative( + &self, + relative_path: &RelativePath, + ) -> Result, ObjectStorageError>; + async fn get_all_saved_filters( &self, - ) -> Result>, ObjectStorageError>; + ) -> Result>, ObjectStorageError> { + let mut filters: HashMap> = HashMap::new(); + + let users_dir = RelativePathBuf::from(USERS_ROOT_DIR); + for user in self.list_dirs_relative(&users_dir).await? { + let stream_dir = users_dir.join(&user).join("filters"); + for stream in self.list_dirs_relative(&stream_dir).await? { + let filters_path = stream_dir.join(&stream); + let filter_bytes = self + .get_objects( + Some(&filters_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + filters + .entry(filters_path) + .or_default() + .extend(filter_bytes); + } + } + + Ok(filters) + } + async fn get_all_dashboards( &self, - ) -> Result>, ObjectStorageError>; + ) -> Result>, ObjectStorageError> { + let mut dashboards: HashMap> = HashMap::new(); + + let users_dir = RelativePathBuf::from(USERS_ROOT_DIR); + for user in self.list_dirs_relative(&users_dir).await? { + let dashboards_path = users_dir.join(&user).join("dashboards"); + let dashboard_bytes = self + .get_objects( + Some(&dashboards_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + dashboards + .entry(dashboards_path) + .or_default() + .extend(dashboard_bytes); + } + + Ok(dashboards) + } + + ///fetch all correlations stored in object store + /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( &self, - ) -> Result>, ObjectStorageError>; + ) -> Result>, ObjectStorageError> { + let mut correlations: HashMap> = HashMap::new(); + + let users_dir = RelativePathBuf::from(USERS_ROOT_DIR); + for user in self.list_dirs_relative(&users_dir).await? { + let correlations_path = users_dir.join(&user).join("correlations"); + let correlation_bytes = self + .get_objects( + Some(&correlations_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + correlations + .entry(correlations_path) + .or_default() + .extend(correlation_bytes); + } + + Ok(correlations) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn list_manifest_files( &self, @@ -306,7 +376,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn get_alerts(&self) -> Result, ObjectStorageError> { - let alerts_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); + let alerts_path = RelativePathBuf::from(ALERTS_ROOT_DIRECTORY); let alerts = self .get_objects( Some(&alerts_path), @@ -630,7 +700,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn get_correlations(&self) -> Result, CorrelationError> { - let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIR]); + let correlation_path = RelativePathBuf::from(CORRELATION_DIR); let correlation_bytes = self .get_objects( Some(&correlation_path), @@ -652,8 +722,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream = PARSEABLE.get_or_create_stream(&stream_name); let custom_partition = stream.get_custom_partition(); - for file in stream.parquet_files() { - let filename = file + for path in stream.parquet_files() { + let filename = path .file_name() .expect("only parquet files are returned by iterator") .to_str() @@ -661,7 +731,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let mut file_date_part = filename.split('.').collect::>()[0]; file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); + let compressed_size = path.metadata().map_or(0, |meta| meta.len()); STORAGE_SIZE .with_label_values(&["data", &stream_name, "parquet"]) .add(compressed_size as i64); @@ -685,8 +755,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream_relative_path = format!("{stream_name}/{file_suffix}"); // Try uploading the file, handle potential errors without breaking the loop - if let Err(e) = self.upload_file(&stream_relative_path, &file).await { - error!("Failed to upload file {}: {:?}", filename, e); + if let Err(e) = self.upload_file(&stream_relative_path, &path).await { + error!("Failed to upload file {filename:?}: {e}"); continue; // Skip to the next file } @@ -695,17 +765,21 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .to_string(); let store = PARSEABLE.storage().get_object_store(); let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); + catalog::create_from_parquet_file(absolute_path.clone(), &path).unwrap(); catalog::update_snapshot(store, &stream_name, manifest).await?; - let _ = fs::remove_file(file); + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); + } } for path in stream.schema_files() { let file = File::open(&path)?; let schema: Schema = serde_json::from_reader(file)?; commit_schema_to_storage(&stream_name, schema).await?; - let _ = fs::remove_file(path); + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); + } } } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 5fba3ba1b..53621396b 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -16,45 +16,49 @@ * */ +use std::{ + collections::{BTreeMap, HashSet}, + fmt::Display, + path::Path, + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; + use async_trait::async_trait; use bytes::Bytes; -use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +use datafusion::{ + datasource::listing::ListingTableUrl, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, + runtime_env::RuntimeEnvBuilder, + }, +}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use object_store::{ + aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}, + limit::LimitStore, + path::Path as StorePath, + BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig, }; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryStreamExt}; -use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; -use object_store::limit::LimitStore; -use object_store::path::Path as StorePath; -use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; use tracing::{error, info}; -use std::collections::{BTreeMap, HashSet}; -use std::fmt::Display; -use std::iter::Iterator; -use std::path::Path as StdPath; -use std::str::FromStr; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use crate::{ + handlers::http::users::USERS_ROOT_DIR, + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + parseable::LogStream, +}; -use super::metrics_layer::MetricLayer; -use super::object_storage::parseable_json_path; use super::{ - ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, + PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, + STREAM_ROOT_DIRECTORY, }; -use crate::handlers::http::users::USERS_ROOT_DIR; -use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::parseable::LogStream; -use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; -use std::collections::HashMap; // in bytes // const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; -const CONNECT_TIMEOUT_SECS: u64 = 5; -const REQUEST_TIMEOUT_SECS: u64 = 300; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; #[derive(Debug, Clone, clap::Args)] @@ -296,7 +300,7 @@ impl ObjectStorageProvider for S3Config { let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); let s3 = MetricLayer::new(s3); - let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); + let object_store_registry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(s3)); @@ -325,10 +329,6 @@ impl ObjectStorageProvider for S3Config { } } -fn to_object_store_path(path: &RelativePath) -> StorePath { - StorePath::from(path.as_str()) -} - #[derive(Debug)] pub struct S3 { client: LimitStore, @@ -483,7 +483,7 @@ impl S3 { } Ok(result_file_list) } - async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let instant = Instant::now(); // // TODO: Uncomment this when multipart is fixed @@ -512,7 +512,7 @@ impl S3 { } // TODO: introduce parallel, multipart-uploads if required - // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; // let mut file = OpenOptions::new().read(true).open(path).await?; @@ -759,7 +759,7 @@ impl ObjectStorage for S3 { Ok(files) } - async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { self._upload_file(key, path).await?; Ok(()) @@ -795,125 +795,19 @@ impl ObjectStorage for S3 { .collect::>()) } - async fn get_all_dashboards( + async fn list_dirs_relative( &self, - ) -> Result>, ObjectStorageError> { - let mut dashboards: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { + let prefix = object_store::path::Path::from(relative_path.as_str()); + let resp = self.client.list_with_delimiter(Some(&prefix)).await?; - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_dashboard_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); - let dashboards_path = RelativePathBuf::from(&user_dashboard_path); - let dashboard_bytes = self - .get_objects( - Some(&dashboards_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - dashboards - .entry(dashboards_path) - .or_default() - .extend(dashboard_bytes); - } - Ok(dashboards) - } - - async fn get_all_saved_filters( - &self, - ) -> Result>, ObjectStorageError> { - let mut filters: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_filters_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); - let resp = self - .client - .list_with_delimiter(Some(&user_filters_path)) - .await?; - let streams = resp - .common_prefixes - .iter() - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for stream in streams { - let filters_path = RelativePathBuf::from(&stream); - let filter_bytes = self - .get_objects( - Some(&filters_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - filters - .entry(filters_path) - .or_default() - .extend(filter_bytes); - } - } - Ok(filters) - } - - ///fetch all correlations stored in object store - /// return the correlation file path and all correlation json bytes for each file path - async fn get_all_correlations( - &self, - ) -> Result>, ObjectStorageError> { - let mut correlations: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp + Ok(resp .common_prefixes .iter() .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_correlation_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); - let correlations_path = RelativePathBuf::from(&user_correlation_path); - let correlation_bytes = self - .get_objects( - Some(&correlations_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - correlations - .entry(correlations_path) - .or_default() - .extend(correlation_bytes); - } - Ok(correlations) + .collect::>()) } fn get_bucket_name(&self) -> String {