-
-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: capture ingestion time at receive #1210
Conversation
WalkthroughThis change series introduces a new timestamp parameter across several modules. The modifications update method signatures in key functions such as Changes
Sequence Diagram(s)sequenceDiagram
participant IngestHandler as HTTP Ingest Handler
participant KafkaProcessor as Kafka Processor
participant EventFormat as Event Format Module
participant ArrowUtil as Arrow Utility
IngestHandler->>KafkaProcessor: Invoke build_event_from_chunk (includes Utc::now())
KafkaProcessor->>EventFormat: Call into_recordbatch(p_timestamp = Utc::now(), ...)
EventFormat->>ArrowUtil: Request get_timestamp_array(p_timestamp, num_rows)
ArrowUtil-->>EventFormat: Return timestamp array
EventFormat-->>KafkaProcessor: Return RecordBatch with timestamp data
KafkaProcessor-->>IngestHandler: Return processed event batch
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/connectors/kafka/processor.rs (1)
77-87
: Consider using the same timestamp instance throughout the function.While the timestamp for the record batch is now passed explicitly at line 71, a separate timestamp is still generated for
parsed_timestamp
at line 83. For better consistency, consider using the same timestamp instance for both places.+let now = Utc::now(); let (rb, is_first) = batch_json_event.into_recordbatch( &schema, - Utc::now(), + now, static_schema_flag, time_partition.as_ref(), schema_version, )?; let p_event = ParseableEvent { rb, stream_name: stream_name.to_string(), origin_format: "json", origin_size: total_payload_size, is_first_event: is_first, - parsed_timestamp: Utc::now().naive_utc(), + parsed_timestamp: now.naive_utc(), time_partition: None, custom_partition_values: HashMap::new(), stream_type: StreamType::UserDefined, };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/connectors/kafka/processor.rs
(1 hunks)src/event/format/mod.rs
(3 hunks)src/handlers/http/ingest.rs
(14 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(4 hunks)src/utils/arrow/mod.rs
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (24)
src/connectors/kafka/processor.rs (1)
69-75
: Updated timestamp handling matches new method signature.The code now correctly passes the current timestamp from
Utc::now()
to theinto_recordbatch
method, aligning with the updated signature in theEventFormat
trait.src/handlers/http/modal/utils/ingest_utils.rs (4)
99-99
: Captures ingestion timestamp at a single point.Storing the current time in a variable at the start ensures consistent timestamp usage throughout the log processing flow.
125-125
: Uses the captured timestamp for consistency.Uses the previously captured timestamp rather than generating a new one, ensuring the same timestamp is used consistently.
146-152
: Passes the captured timestamp to into_event_batch.Correctly updates the function call to pass the timestamp parameter, ensuring consistency with the updated method signature.
170-184
: Updates function signature to accept timestamp parameter.The function signature has been properly updated to accept a
DateTime<Utc>
parameter, and it correctly passes this timestamp to the underlyinginto_recordbatch
method.src/utils/arrow/mod.rs (3)
136-138
: Function signature enhanced to accept explicit timestamp.The
get_timestamp_array
function now accepts aDateTime<Utc>
parameter instead of always using the current time, providing more flexibility and ensuring consistent timestamp usage across the codebase.
197-206
: Tests updated to match new function signature.The tests have been properly updated to work with the new function signature, including creating a timestamp variable and comparing against its value.
210-212
: Zero-size test updated for new signature.This test has been properly updated to match the new function signature.
src/event/format/mod.rs (2)
108-115
: Method signature updated to accept timestamp parameter.The
into_recordbatch
method now accepts ap_timestamp
parameter, allowing the ingestion time to be captured at receipt and consistently used throughout processing.
145-150
: Uses provided timestamp for array generation.Updated to pass the provided timestamp to
get_timestamp_array
rather than generating a new one, ensuring consistency in timestamp usage.src/handlers/http/ingest.rs (14)
82-82
: Good improvement - capturing timestamp earlier in the processAdding a timestamp capture at the beginning of the function ensures consistency throughout the processing pipeline. This aligns with the PR objective of capturing ingestion time at receipt.
96-96
: Consistent timestamp propagationAppropriate modification to pass the captured timestamp to the
into_recordbatch
method, ensuring the same timestamp is used throughout the processing pipeline.
104-104
: Using consistent timestamp referenceGood change to use the previously captured timestamp instead of generating a new one. This maintains temporal consistency between record creation and event processing.
396-404
: Updated test function signature to match implementation changesThe
into_event_batch
test call has been properly updated to include the timestamp parameter, ensuring the tests reflect the new implementation requirements.
430-438
: Consistent test updatesTest cases have been consistently updated to include the timestamp parameter in all calls to
into_event_batch
.
468-469
: Aligned test with implementationTest case correctly updated to include timestamp parameter.
499-501
: Test assertion updated correctlyError checking test case properly updated to include timestamp in method call.
517-518
: Test consistency maintainedEmpty object test correctly updated with timestamp parameter.
557-565
: Array conversion test updatedArray to recordbatch test properly updated to include timestamp parameter.
611-619
: Test with null values updated correctlyTest with null values in arrays properly updated with timestamp parameter.
666-667
: Schema derivation test updatedTest with schema derivation properly updated to include timestamp parameter.
714-716
: Schema mismatch test updatedSchema mismatch error test properly updated with timestamp parameter.
757-757
: Nested type test updatedNested array object test properly updated to include timestamp parameter.
846-846
: Schema version V1 test updatedTest with SchemaVersion::V1 properly updated to include timestamp parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to merge
Currently we are capturing ingestion time in many disjoint places, with this PR we bring it to just one point, right after data is received.
Currently we are capturing ingestion time in many disjoint places, with this PR we bring it to just one point, right after data is received.
Fixes #XXXX.
Description
Currently we are capturing ingestion time in many disjoint places, with this PR we bring it to just one point, right after data is received.
This PR has:
Summary by CodeRabbit