Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: DRY object_storage #1147

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Jan 29, 2025

Fixes #XXXX.

Description

  • Ensure parts of object storage aren't unnecessarily copied and code is trimmed off

This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Introduced a unified directory listing capability for managing stored content.
    • Added new timeout settings and improved utilities for consistent path handling.
  • Refactor

    • Removed legacy methods for fetching detailed user data to simplify functionality.
    • Updated file upload procedures for better consistency and maintainability.
    • Consolidated internal dependencies to enhance clarity and streamline operations.

@de-sh de-sh marked this pull request as draft January 29, 2025 21:59
@de-sh de-sh marked this pull request as ready for review February 1, 2025 09:55
Copy link
Contributor

@parmesant parmesant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link

coderabbitai bot commented Feb 12, 2025

Walkthrough

The changes update multiple storage modules by removing functions for fetching user dashboards, saved filters, and correlations and replacing them with a new, unified directory listing function list_dirs_relative. Function signatures in upload methods have been modified to use std::path::Path instead of legacy types. Imports have been reorganized and consolidated, with new dependencies added in several modules. Module visibility and timeout constants have been updated in the main storage module, and the ObjectStorage trait has been enhanced with asynchronous methods for data retrieval and an improved file removal process.

Changes

Files Summary
src/storage/azure_blob.rs, src/storage/localfs.rs, src/storage/s3.rs Reorganized imports; updated function signatures to use Path instead of legacy types; removed functions for dashboards, filters, and correlations; replaced them with a new list_dirs_relative function; eliminated redundant to_object_store_path functionality.
src/storage/mod.rs Updated module visibility to public; added new imports (including chrono, serde, and RelativePath related); introduced timeout constants; added a new public utility to_object_store_path for converting RelativePath to Path.
src/storage/object_storage.rs Enhanced the ObjectStorage trait by adding asynchronous methods for retrieving correlations, dashboards, and saved filters; refactored the file removal process in the upload function.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant SP as Storage Provider
    participant DS as Directory Listing Service

    C->>SP: list_dirs_relative(relative_path)
    SP->>DS: Read directory names (from file system/blob storage)
    DS-->>SP: Return list of directory names
    SP-->>C: Return Vec<String> directory list
Loading

Poem

I'm a rabbit with a joyful hop,
Watching old functions gracefully drop.
New paths and listings lead the way,
In streamlined code, I frolic all day.
With imports tidy and changes so neat,
My code garden blooms in every beat! 🐇
Happy leaps and bounds in every street!


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
src/storage/s3.rs (1)

515-555: 🛠️ Refactor suggestion

Clean up the commented multipart upload code.

The commented code contains references to removed functionality and should be either:

  1. Updated to use the current API (preferred)
  2. Removed if no longer needed
🧹 Nitpick comments (7)
src/storage/localfs.rs (1)

361-380: Consider adding a dedicated unit test.
This new list_dirs_relative function is straightforward, but adding a test case for an empty or non-existent directory would help ensure correctness and guard against edge cases.

src/storage/azure_blob.rs (1)

668-681: Validate return of common_prefixes.
The implementation of list_dirs_relative utilizing list_with_delimiter is correct for directory-level listings. Ensure that calling code expects only top-level subdirectories (any deeper path segments are flattened).

src/storage/object_storage.rs (4)

125-148: Slight inconsistency in path building for dashboards.
Here, the path is built via string concatenation and converted back to a RelativePathBuf. Consider using a consistent approach (e.g., RelativePathBuf::from_iter([USERS_ROOT_DIR, user, DASHBOARDS_DIR])) for clarity and uniformity.


150-175: Maintain consistent path handling.
Similar to dashboards, building correlation paths involves mixing string-based .from(format!(...)) and RelativePathBuf::from(...). Using a single pattern (e.g., RelativePathBuf::from_iter) would strengthen clarity and reduce the chance of mistakes.


776-776: Ignoring removal errors may mask issues.
This line discards any I/O error if file removal fails. If preserving reliability is crucial, consider logging or handling this error.


783-783: Same concern regarding file removal.
As above, consider logging the potential error instead of quietly ignoring it.

src/storage/s3.rs (1)

486-512: Address the multipart upload TODO.

The multipart upload functionality is currently disabled with a TODO comment. This could impact performance when uploading large files.

Would you like me to help implement the multipart upload functionality? I can provide a solution that follows AWS best practices for handling large file uploads.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 59a82d2 and 6894b63.

📒 Files selected for processing (5)
  • src/storage/azure_blob.rs (6 hunks)
  • src/storage/localfs.rs (3 hunks)
  • src/storage/mod.rs (4 hunks)
  • src/storage/object_storage.rs (4 hunks)
  • src/storage/s3.rs (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (15)
src/storage/localfs.rs (3)

20-20: No concerns with added HashSet import.
The HashSet type aligns with usage in functions like list_streams.


30-30: Futures imports look appropriate.
No issues detected with the addition of concurrency-related imports.


37-39: Import reorganization looks fine.
These changes pose no functional or stylistic concerns.

src/storage/azure_blob.rs (2)

18-57: Import adjustments appear reasonable.
All newly introduced items (e.g., Arc, Instant, path usage) seem relevant to the changed code.


170-170: No issues with the registry usage.
Registering the object store with the default registry remains consistent with the rest of the code.

src/storage/object_storage.rs (4)

19-28: New imports appear relevant.
They align with new methods that handle dashboards, filters, correlations, and file I/O.


39-40: No concerns with logging imports.
Introducing info, error, and warn from tracing is beneficial for detailed logging.


43-56: Additional crate references are valid.
The newly referenced modules (e.g., correlation, alerts) align with the new methods below.


95-98: Trait signature for list_dirs_relative.
Defining this method in the trait ensures a uniform directory-listing approach across all storage backends.

src/storage/mod.rs (4)

19-22: LGTM!

The new imports are well-organized and necessary for the enhanced path handling functionality.


37-37: LGTM!

Making the module public aligns with the PR's DRY objectives by allowing code reuse across crates.


267-269: LGTM!

The utility function provides a clean abstraction for path conversion, promoting consistent path handling across the codebase.


81-82: Verify the timeout values against AWS best practices.

The timeout values seem reasonable, but let's verify them against AWS recommendations:

  • Connection timeout: 5 seconds
  • Request timeout: 300 seconds (5 minutes)
✅ Verification successful

🌐 Web query:

What are the recommended timeout values for AWS S3 SDK operations?

💡 Result:

AWS S3 SDK timeout recommendations vary by use case and SDK, but general best practices emphasize balancing fail-fast behavior with sufficient time for large operations. Key guidelines include:

General Timeout Strategies

  • Large object transfers: Increase total timeout (timeout, operation_timeout) while keeping per-attempt limits low
    Example: iOS SDK sets timeout=600s for large files but connection_timeout=60s [1]
  • Latency-sensitive apps:
    • Retry GET/PUT after 2 seconds with exponential backoff (+4 sec per retry) [9]
    • Track & retry slowest 5% of large requests (>128MB) [9]
  • Avoid idle connections:
    • VPC endpoints terminate idle connections after 350 seconds[6]

SDK-Specific Configurations

SDK Key Parameters Recommended Values Source
Java apiCallTimeout + apiCallAttemptTimeout Total: <15min
Attempt: ~1.5-5sec
[4][9]
.NET Timeout + ReadWriteTimeout Default 100/300sec
Max for uploads
[7]
Rust operation_timeout + connect_timeout Total: ~5sec
Connect: ~3sec
[2]
C++ RequestTimeout Increase from default 3sec [5][10]
iOS timeout + connectionTimeout Large files: 600s
Fast fail:60s
[1]

Retry Optimization

// Node.js Lambda example using remaining time
req.setTimeout(context.getRemainingTimeInMillis() - 500); // Reserve 500ms 
  • Set integration timeouts shorter than function timeout (e.g., API Gateway:29sec vs Lambda:3-6sec)[3]
  • Use circuit breakers when exceeding downstream limits[9]

For high-throughput scenarios using KMS encryption, verify KMS request quotas align with your timeout strategy[9]. Monitor via CloudWatch/X-Ray to fine-tune values[3][9].

Citations:


Timeout Values Review: AWS Best Practices Comparison

  • Connection Timeout (5 seconds): While some AWS SDK documentation for Rust suggests a connect timeout closer to ~3 seconds for standard operations, a 5-second timeout can be acceptable if your environment anticipates network variability.
  • Request Timeout (300 seconds): A longer request timeout (5 minutes) fits scenarios involving large object transfers or operations that might take longer than typical API calls. This aligns with some SDK configurations (e.g., .NET) and the general advice to use extended timeouts where needed.

Overall, the chosen values are reasonable provided they match your application's use case. If most S3 operations are lightweight, you might consider reducing the connection timeout for fail-fast behavior. Otherwise, if long transfers are common, the 300-second request timeout is justified.

src/storage/s3.rs (2)

19-58: LGTM!

The imports are well-organized, logically grouped, and necessary for the enhanced functionality.


798-811: LGTM!

The new list_dirs_relative method provides a clean, reusable implementation for directory listing, aligning well with the PR's DRY objectives.

Comment on lines 100 to +123
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();

let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
for user in self.list_dirs_relative(&users_dir).await? {
let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]);
for stream in self.list_dirs_relative(&stream_dir).await? {
let filters_path = RelativePathBuf::from(&stream);
let filter_bytes = self
.get_objects(
Some(&filters_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential path construction bug for saved filters.
Within get_all_saved_filters, the path components for each stream folder are combined incorrectly. Currently, the code does not preserve the intermediate directory hierarchy under USERS_ROOT_DIR/<user>. As a result, calls to get_objects may target a path that doesn’t reflect the intended subdirectory structure.

Below is an example fix where each found directory is appended properly:

 for user in self.list_dirs_relative(&users_dir).await? {
     let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]);
     for stream in self.list_dirs_relative(&stream_dir).await? {
-        let filters_path = RelativePathBuf::from(&stream);
+        let filters_path = stream_dir.join(&RelativePathBuf::from(&stream));

         let filter_bytes = self
             .get_objects(
                 Some(&filters_path),
                 Box::new(|file_name| file_name.ends_with(".json")),
             )
             .await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
for user in self.list_dirs_relative(&users_dir).await? {
let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]);
for stream in self.list_dirs_relative(&stream_dir).await? {
let filters_path = RelativePathBuf::from(&stream);
let filter_bytes = self
.get_objects(
Some(&filters_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
}
async fn get_all_saved_filters(
&self,
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
for user in self.list_dirs_relative(&users_dir).await? {
let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]);
for stream in self.list_dirs_relative(&stream_dir).await? {
let filters_path = stream_dir.join(&RelativePathBuf::from(&stream));
let filter_bytes = self
.get_objects(
Some(&filters_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters
.entry(filters_path)
.or_default()
.extend(filter_bytes);
}
}
Ok(filters)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants