Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: parallelise datasets API #1276

Merged

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Mar 28, 2025

process each stream in parallel
process distinct counts in parallel

Summary by CodeRabbit

  • Refactor

    • Streamlined dataset retrieval with concurrent processing for faster response times.
    • Enhanced error handling for quicker identification and filtering of problematic data.
  • New Features

    • Introduced additional validation for stream availability and security, ensuring only valid, authorized data is returned.
    • Added new methods for improved functionality in dataset processing and response building.

process each stream in parallel
process distinct counts in parallel
Copy link

coderabbitai bot commented Mar 28, 2025

Walkthrough

This change refactors the get_datasets method within the PrismDatasetRequest implementation. The method now processes streams concurrently using futures::future::join_all instead of sequentially. It introduces several helper methods to encapsulate functionalities such as authorization checks, stream existence verification, dataset response building, and retrieval of hot tier info and counts. Error handling is enhanced by logging errors without interrupting the processing of other streams.

Changes

File(s) Change Summary
src/prism/logstream/mod.rs Refactored get_datasets to process streams concurrently via futures::future::join_all and added helper methods: process_stream, is_authorized, stream_exists, build_dataset_response, get_hot_tier_info, and get_counts to manage stream verification and error handling.

Possibly related PRs

  • feat: prism post datasets API #1236: The changes in the main PR, specifically the refactoring of the get_datasets method and the introduction of new methods in PrismDatasetRequest, are directly related to the new post_datasets function in the retrieved PR, which processes requests involving PrismDatasetRequest.
  • fix: datasets API edgecase where stream is not hottiered #1252: The changes in the main PR focus on refactoring the get_datasets method and introducing new helper methods, while the retrieved PR enhances error handling specifically related to the get_hot_tier method within the same get_datasets method, indicating a direct relationship in their modifications.

Suggested labels

for next release

Suggested reviewers

  • praveen5959
  • de-sh

Poem

Hopping through code with a speedy swing,
I watch the concurrent streams take wing.
With helpers stepping in one by one,
Every check shines like a bright sun.
I'm a little rabbit, thrilled by what’s begun!
🥕🐇

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
src/prism/logstream/mod.rs (6)

249-249: Consider concurrency limits for large stream lists.

If the system has a large number of streams, listing them all and processing concurrently can overwhelm resources. You may consider chunking or otherwise bounding concurrency to prevent resource exhaustion.


253-259: Use bounded concurrency when needed.

futures::future::join_all runs all tasks at once. If self.streams grows large, the system may spawn excessive concurrent tasks. Consider using a method such as futures::stream::iter(...).buffer_unordered(...) to cap concurrency.

- let results = futures::future::join_all(
-     self.streams
-         .iter()
-         .map(|stream| self.process_stream(stream.clone(), key.clone())),
- ).await;
+ let results = futures::stream::iter(
+     self.streams.iter().map(|stream| {
+         self.process_stream(stream.clone(), key.clone())
+     })
+ )
+ .buffer_unordered(10) // or any appropriate limit
+ .collect::<Vec<_>>()
+ .await;

273-294: Simplify function signature for clarity.

process_stream returns Option<Result<...>>, which can be confusing. A more direct approach might be returning Result<Option<PrismDatasetResponse>, PrismLogstreamError> to unify error handling and optional logic.

- async fn process_stream(&self, stream: String, key: SessionKey) 
-     -> Option<Result<PrismDatasetResponse, PrismLogstreamError>> {
+ async fn process_stream(&self, stream: String, key: SessionKey) 
+     -> Result<Option<PrismDatasetResponse>, PrismLogstreamError> {
    // Then return Ok(None) if unauthorized or non-existent,
    // and return Ok(Some(...)) or Err(...) otherwise.
}

307-314: Clarify the meaning of check_or_load_stream return value.

Currently, check_or_load_stream(stream) returning true indicates the stream does not exist, making the method return false for stream_exists. The naming is counterintuitive. Consider renaming to something like is_stream_missing(...) or reversing its return value for clarity.


316-349: Handle distinct entry retrieval errors more visibly.

Lines 333–334 quietly ignore errors (using .ok()) for IPs/user_agents retrieval. Consider logging or returning partial errors so that administrators can diagnose query failures for distinct fields.


367-380: Make hardcoded time range and bin count configurable.

Currently, the time range is fixed at "1h" to "now" with 10 bins (lines 370–373). Consider allowing clients to specify these parameters to accommodate various query requirements and reduce potential overhead from unneeded bins.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9cd3809 and 5855693.

📒 Files selected for processing (1)
  • src/prism/logstream/mod.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/prism/logstream/mod.rs (2)

296-305: Authorization check looks good.

This function cleanly logs unauthorized attempts without exposing sensitive details. No issues found here.


351-365: Global manager usage is acceptable.

get_hot_tier_info responsibly handles missing or invalid hot tier data, returning None or the error. The concurrency aspects here look fine.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/prism/logstream/mod.rs (3)

264-273: Consider adding stream context to error logs.

When logging errors, it would be helpful to include the stream name for better debugging context.

-                Err(err) => {
-                    warn!("error: {err}");
-                    continue;
+                Err(err) => {
+                    warn!("Error processing stream '{}': {}", stream, err);
+                    continue;
                 }

313-320: Consider adjusting log level for stream existence checks.

For cases where you're querying all streams and some might legitimately not exist, a warning log level might be too high. Consider using debug level instead, especially if this is an expected condition in some scenarios.

-            warn!("Stream not found: {stream}");
+            debug!("Stream not found: {stream}");

373-386: Document or parameterize the hardcoded time values.

The get_counts method uses hardcoded values for start_time, end_time, and num_bins. Consider making these configurable parameters or documenting why these specific values are used.

 async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
+        // Using last hour of data with 10 bins as default for dashboard visualization
         let count_request = CountsRequest {
             stream: stream.to_owned(),
             start_time: "1h".to_owned(),
             end_time: "now".to_owned(),
             num_bins: 10,
         };

Alternatively, consider parameterizing these values:

-    async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
+    async fn get_counts(
+        &self, 
+        stream: &str,
+        start_time: Option<String>,
+        end_time: Option<String>,
+        num_bins: Option<usize>
+    ) -> Result<CountsResponse, PrismLogstreamError> {
         let count_request = CountsRequest {
             stream: stream.to_owned(),
-            start_time: "1h".to_owned(),
-            end_time: "now".to_owned(),
-            num_bins: 10,
+            start_time: start_time.unwrap_or_else(|| "1h".to_owned()),
+            end_time: end_time.unwrap_or_else(|| "now".to_owned()),
+            num_bins: num_bins.unwrap_or(10),
         };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5855693 and dc754d8.

📒 Files selected for processing (1)
  • src/prism/logstream/mod.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (6)
src/prism/logstream/mod.rs (6)

27-27: Good addition of the warn module.

Adding the warn module from tracing is appropriate for the improved error handling implemented in this PR.


249-277: Great implementation of concurrent stream processing!

This refactoring successfully implements parallel processing of streams using futures::future::join_all, which aligns perfectly with the PR objective to enhance performance. The error handling has also been significantly improved - now logging errors and continuing processing instead of failing the entire operation.

This change addresses the past review comment about inconsistent error handling.


280-300: Well-structured stream processing method.

Good implementation of the process_stream method with clear separation of concerns - checking authorization, verifying stream existence, and then processing the stream data. The early returns for unauthorized or non-existent streams prevent unnecessary processing.


302-311: Clean authorization check implementation.

The is_authorized method properly encapsulates the authorization logic with good error logging.


322-355: Excellent use of concurrent operations in response building.

The parallel fetching of distinct entries using futures::join! aligns well with the PR's objective of enhancing performance through parallelization. Also good error handling with .ok() on the results.


357-371: Clean implementation of hot tier info retrieval.

Good error handling with proper pattern matching on different error types.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 28, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
src/prism/logstream/mod.rs (5)

249-251: Clarify fallback logic when streams is empty
When self.streams is empty, it is replaced with the entire list of streams. This behavior may be surprising to users expecting an empty request to return nothing. Consider documenting or confirming this fallback with the caller to ensure it’s intentional.


280-300: Consider surfacing partial errors in process_stream
When a stream is missing or unauthorized, this function returns Ok(None). Users may benefit from more granular logging or a structured report (e.g., which streams failed). The current approach is fine for skipping failed streams, but consider whether partial errors should be communicated more explicitly to the end user.


302-311: Minimize cloning of SessionKey
The is_authorized method clones the session key each time. Although cheap here, consider passing a reference to avoid unneeded copies if usage increases.


313-320: Rename or remove log message in stream_exists
This method logs a warning if the stream is not found, yet it returns false to indicate the same condition. If repeated calls happen for the same missing stream, you risk spamming logs. Consider promoting this warning to a debug-level log, or rename the function to check_stream_and_log_if_missing to clarify side effects.


373-385: Parameterize the time range
The get_counts function uses a fixed start time of "1h" and end time of "now". If you anticipate different time windows from consumers, consider making these fields configurable within PrismDatasetRequest.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc754d8 and d530b72.

📒 Files selected for processing (1)
  • src/prism/logstream/mod.rs (2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Lint, Test and Coverage Report
src/prism/logstream/mod.rs

[error] 271-271: error[E0425]: cannot find value stream in this scope

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/prism/logstream/mod.rs (2)

253-260: Good use of concurrent processing
Using futures::future::join_all for concurrently processing each stream is a solid approach, ensuring the method won’t block on stream processing. The concurrency design aligns well with the goal of parallelizing dataset retrieval.


322-355: Parallel retrieval of distinct field values
Performing two get_distinct_entries calls with futures::join! is efficient. Ensure that this does not introduce excessive overhead for large datasets. Optionally, you can unify or cache these calls if further expansions (e.g., multiple fields) are expected.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
src/prism/logstream/mod.rs (4)

250-252: Consider filtering out unauthorized streams earlier.
Currently, if streams.is_empty() we pull all streams, but only later skip unauthorized ones in process_stream(). Consider an early filter to avoid generating possibly large unnecessary futures for streams the user cannot access, improving overall performance.


254-261: Limit concurrency if the stream list is large.
Using futures::future::join_all will spawn a task for each stream. If the user can query hundreds or thousands of streams at once, this could create significant overhead. Consider using a bounded concurrency approach like futures::stream::iter(...).for_each_concurrent(...) with a reasonable limit to prevent resource exhaustion.


314-321: Clarify the return value of stream_exists().
The function name suggests it returns true if the stream exists, but it returns false if check_or_load_stream() is true. This can be confusing. Consider renaming to something like stream_missing() or flipping the condition to improve readability.


374-387: Parameterize the time range if needed.
Currently, the start and end times for counts are hardcoded to 1h and now. If the user or other components eventually require flexibility, consider exposing them as optional parameters for this method.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d530b72 and 7053832.

📒 Files selected for processing (1)
  • src/prism/logstream/mod.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (4)
src/prism/logstream/mod.rs (4)

27-27: No issues noted on this import addition.
This addition of use tracing::warn; is consistent with the logging approach used throughout the file.


262-279: Implementation aligns with the documented behavior.
The loop collects only successful responses and continues after errors, matching the doc comment that individual stream failures won’t halt the entire process. Good job adhering to the requirement.


281-301: Overall concurrency design looks correct.
Each stream is processed in its own future, and errors are handled gracefully. This is consistent with the stated PR objective to parallelize dataset retrieval without blocking on individual stream failures.


303-312: Method naming and usage is intuitive.
is_authorized accurately encodes the RBAC check, returning false when authorization fails. The logging of unauthorized access is appropriate.

@nikhilsinhaparseable nikhilsinhaparseable merged commit b32aa56 into parseablehq:main Mar 28, 2025
14 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Mar 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants