Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor+test: datatype updation #1262

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Mar 21, 2025

Fixes #XXXX.

Description

All kind of json/format specific handling should not leak into the generic event handling code. This PR aims to ensure that we keep that distinction. We also add tests to validate the working of the said code.


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features
    • Enhanced JSON event processing to automatically adjust field types, converting valid date strings to timestamps and numeric values to a floating-point representation.
  • Refactor
    • Streamlined the event schema generation and record processing interfaces for improved performance and clarity.
    • Simplified the logic in the delete and detect_schema functions for better readability and efficiency.
  • Tests
    • Added comprehensive tests to verify that time-based and numeric fields are correctly inferred and updated based on log content and schema version.

Sorry, something went wrong.

Copy link

coderabbitai bot commented Mar 21, 2025

Walkthrough

This change updates the JSON event processing and schema inference logic. A new static array for time-related field names and a function to override inferred data types have been introduced. The modifications simplify the EventFormat interface by removing the time_partition parameter and updating schema handling to use mutable references to avoid unnecessary cloning. Test cases have been added to validate the new behavior. Minor adjustments in the HTTP logstream handler improve import usage and schema initialization.

Changes

Files Summary
src/event/format/json.rs Added TIME_FIELD_NAME_PARTS static array and the new function override_inferred_data_type; updated the to_data method to iterate over log records and adjust field types based on content; augmented test cases to verify type adjustments.
src/event/format/mod.rs Removed the time_partition parameter from to_data and into_recordbatch method signatures; updated function signatures (get_existing_field_names, override_existing_timestamp_fields, update_field_type_in_schema) to use mutable references for in-place schema updates, reducing cloning.
src/handlers/http/logstream.rs Updated imports to use remove_dir_all directly; replaced override_data_type with override_inferred_data_type; simplified the schema initialization in detect_schema by removing unneeded Arc wrapping and iterating over log records with updated function calls.

Sequence Diagram(s)

Loading
sequenceDiagram
    participant L as Log Record
    participant E as EventFormat
    participant O as override_inferred_data_type
    participant S as Schema

    L->>E: Pass log record for processing
    E->>O: Call override_inferred_data_type for each record
    O->>S: Check and update field type based on value

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

Poem

I'm a little rabbit with a hop so spry,
Leaping through code as the schema flows by.
With new functions and arrays in the mix,
Logs transform swiftly—no more old tricks!
Celebrating clean code under the digital sky 🐇.

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Sorry, something went wrong.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/handlers/http/logstream.rs (1)

61-61: Consider error propagation when directory deletion fails.
Currently, the deletion error is only logged. Depending on use case, returning or handling this error might prevent inconsistent states.

src/event/format/json.rs (1)

36-48: Time-related field name patterns.
Defining TIME_FIELD_NAME_PARTS as a static array is a clear, centralized approach, though it may be worth revisiting periodically to include additional time-like indicators.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8ac3105 and d83f54b.

📒 Files selected for processing (3)
  • src/event/format/json.rs (5 hunks)
  • src/event/format/mod.rs (6 hunks)
  • src/handlers/http/logstream.rs (3 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/handlers/http/logstream.rs (1)
src/event/format/json.rs (1) (1)
  • override_inferred_data_type (237-279)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (23)
src/handlers/http/logstream.rs (4)

19-19: Use remove_dir_all directly.
This direct import makes the code more explicit and is aligned with Rust conventions.


27-27: Importing StreamError.
Introducing StreamError from error is consistent and improves error handling readability.


32-43: Expanded imports for schema inference and hot tier management.
These new imports effectively consolidate references to schema logic (override_inferred_data_type) and hot tier utilities, fostering better code organization.


45-45: Importing additional cluster utilities.
Pulling in IngestionStats, QueriedStats, and StorageStats lays groundwork for potential cluster-based operations.

src/event/format/json.rs (7)

25-25: New TimeUnit import.
Bringing TimeUnit into scope makes timestamp conversions straightforward.


235-279: override_inferred_data_type function logic.
This implementation robustly identifies likely timestamp and numeric fields. The explicit checks for RFC3339 and RFC2822 are especially helpful. Watch out for potential performance overhead if log volumes are extremely large, but otherwise this is a neat approach.


475-498: Test: updates_field_type_to_timestamp_for_time_related_fields.
This test increases confidence that the code properly updates string fields to timestamp types when matching time-related names. Good addition to coverage.


500-522: Test: update_field_type_to_timestamp_for_rfc2822_format.
Verifying RFC2822 string fields convert to timestamps helps ensure compatibility with multiple date/time string styles.


524-543: Test: update_numeric_fields_to_float64.
This confirms numeric fields get coerced to Float64 in SchemaVersion::V1, covering a crucial type inference path.


545-564: Test: handles_numeric_fields_already_float64.
Ensuring that pre-existing Float64 fields remain unaltered is a valuable safeguard, preventing accidental type downgrades.


566-585: Test: does_not_update_field_type_for_v0_schema_version.
Correctly verifying that no changes occur for older schema versions solidifies backward compatibility.

src/event/format/mod.rs (12)

137-138: Updated method call without time_partition.
Removing the time_partition parameter from to_data simplifies the interface.


148-148: Creating a new schema from fields.
Instantiating Schema::new(schema) is straightforward; no issues found.


152-153: Updating inferred schema.
update_field_type_in_schema integrates well here, applying needed transformations in place before finalizing.


162-162: Switched to using a borrowed &Schema.
Referencing instead of owning the schema can reduce memory overhead, particularly important for large schemas.


199-199: get_existing_field_names signature change.
Operating on &Schema is consistent with the rest of the codebase’s shift toward borrowing.


218-219: override_existing_timestamp_fields now takes &mut Schema.
Modifying the schema in place removes the need for extra cloning or re-allocation.


231-231: Rewriting fields in-place.
The in-place assignment ensures changes are reflected immediately but watch out for concurrency in future expansions.


249-249: update_field_type_in_schema takes &mut Schema.
More seamless handling than returning a new schema.


252-253: Call to get_existing_field_names.
Helps maintain consistency with existing fields, ensuring correct merges.


256-257: Overriding known timestamp fields.
By re-checking existing schema data, we preserve previously established field types.


260-260: Using Rust’s let-else pattern.
Neat way to handle the optional time partition. Improves code clarity.


263-263: Modifying inferred_schema.fields in place again.
Continues the pattern of direct schema updates, consistent with the rest of the refactor.

Comment on lines +117 to +121
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
for log_record in log_records.iter() {
override_inferred_data_type(&mut schema, log_record, SchemaVersion::V1);
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid .unwrap() to prevent panics.
Using .unwrap() to infer schema can cause runtime panics for invalid JSON inputs. Consider returning an error or using the ? operator for more robust error handling.

- let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
+ let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok))
+     .map_err(|err| StreamError::Custom {
+         msg: format!("Failed to infer JSON schema: {}", err),
+         status: StatusCode::BAD_REQUEST
+     })?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
for log_record in log_records.iter() {
override_inferred_data_type(&mut schema, log_record, SchemaVersion::V1);
}
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok))
.map_err(|err| StreamError::Custom {
msg: format!("Failed to infer JSON schema: {}", err),
status: StatusCode::BAD_REQUEST
})?;
for log_record in log_records.iter() {
override_inferred_data_type(&mut schema, log_record, SchemaVersion::V1);
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/event/format/json.rs (3)

235-279: Schema inference enhancement with type-specific overrides.

The new override_inferred_data_type function intelligently updates field types based on content and naming patterns, which improves data quality and query performance. However, there's room for improvement:

Consider adding support for more timestamp formats beyond RFC3339 and RFC2822. Many log systems use custom formats that wouldn't be detected by the current implementation. You could add support for common formats like ISO8601 variants or consider using a more flexible timestamp parsing library.

 (SchemaVersion::V1, Some(Value::String(s)))
     if TIME_FIELD_NAME_PARTS
         .iter()
         .any(|part| field_name.to_lowercase().contains(part))
         && field.data_type() == &DataType::Utf8
-        && (DateTime::parse_from_rfc3339(s).is_ok()
-            || DateTime::parse_from_rfc2822(s).is_ok()) =>
+        && (DateTime::parse_from_rfc3339(s).is_ok()
+            || DateTime::parse_from_rfc2822(s).is_ok()
+            || NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").is_ok()
+            || NaiveDateTime::parse_from_str(s, "%d/%m/%Y %H:%M:%S").is_ok()) =>

239-241: Consider adding schema version validation.

The function accepts any schema version but only applies changes for SchemaVersion::V1. While the implementation handles this correctly, it would be clearer to validate or document this constraint at the function entry point.

 pub fn override_inferred_data_type(
     schema: &mut Schema,
     log_record: &Value,
     schema_version: SchemaVersion,
 ) {
+    // Early return if schema version doesn't support type overrides
+    if schema_version != SchemaVersion::V1 {
+        return;
+    }
     let Value::Object(map) = log_record else {
         return;
     };

246-278: Potential performance optimization for large schemas.

The current implementation rebuilds the entire schema fields collection for each log record. For large schemas or many records, this could be inefficient.

Consider optimizing to only process fields that are present in the current log record:

-    schema.fields = schema
-        .fields()
-        .iter()
-        .map(|field| {
-            let field_name = field.name().as_str();
-            match (schema_version, map.get(field.name())) {
+    let fields_to_process: Vec<_> = schema
+        .fields()
+        .iter()
+        .enumerate()
+        .filter_map(|(idx, field)| map.get(field.name()).map(|val| (idx, field, val)))
+        .collect();
+
+    for (idx, field, value) in fields_to_process {
+        let field_name = field.name().as_str();
+        let new_field = match (schema_version, value) {

This would only process fields that exist in the current log record, potentially saving work for large schemas.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d83f54b and be0b31a.

📒 Files selected for processing (1)
  • src/event/format/json.rs (5 hunks)
🔇 Additional comments (4)
src/event/format/json.rs (4)

25-25: Appropriate import update.

The import statement correctly adds the necessary types from arrow_schema that are used in the new code.


36-48: Good abstraction for time-related field detection.

Creating a static array for time-related field name parts improves maintainability and makes the code more readable. This approach allows for easy expansion of recognized time field patterns in the future.


104-106: Improved schema inference with dynamic type adjustments.

The loop that calls override_inferred_data_type for each log record is a good approach for dynamically adjusting the schema based on actual data content. This is more flexible than the previous implementation.


476-601: Comprehensive test coverage for the new functionality.

The tests are well-structured and cover various scenarios including:

  • Different timestamp formats (RFC3339, RFC2822)
  • Numeric field conversion
  • Non-standard time strings
  • Different schema versions

This thorough testing approach ensures the new functionality works correctly and helps prevent regressions in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant