-
-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: replace StreamReader/Writer
with FileReader/Writer
#1193
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request introduces a new Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
🔇 Additional comments (6)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (8)
src/parseable/staging/writer.rs (3)
39-48
: Solid introduction ofDiskWriter
struct.
The design nicely encapsulates file handling in a dedicated struct. However, consider documenting concurrency expectations (e.g., ensuring a single thread accessesDiskWriter
at a time).
91-109
:finish
method is well-structured.
Renaming the part file to a completed file is clear. Might want to consider behavior on rename failures (e.g., partial cleanup or rollback).
112-114
: GenericWriter
referencingDiskWriter
.
This integrates memory and disk writers nicely. For large codebases, consider customizing a configurable default row limit or reading it from a central config.src/parseable/staging/reader.rs (2)
131-131
: Test imports updated to reflect file-based I/O.
Looks good. Always ensure an accompanying test for potential edge cases (e.g. partial read failures).Also applies to: 136-137
155-163
: Helperwrite_file
for tests.
Straightforward approach for writing test Arrow files. Possibly add error context or logs if creation fails.src/parseable/streams.rs (2)
79-79
: Hard-coded size forWriter<16384>
.
Consider making16384
a named constant or configuration option to make it clearer and easier to adjust in the future.
373-387
: FlushingDiskWriter
s is appropriate.
Warn logs are correct for failures, and debug logs confirm success. Potentially add a final error if any disk writer fails, or continue as-is if partial successes are allowed.src/query/stream_schema_provider.rs (1)
236-236
: LGTM! Consider adding documentation for time partitioning.The addition of time partition information to
recordbatches_cloned
is a good enhancement that aligns with the PR's objectives.Consider adding a code comment explaining how time partitioning affects the record batch cloning process.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/parseable/mod.rs
(1 hunks)src/parseable/staging/reader.rs
(7 hunks)src/parseable/staging/writer.rs
(1 hunks)src/parseable/streams.rs
(13 hunks)src/query/stream_schema_provider.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (11)
src/parseable/staging/writer.rs (2)
22-24
: Imports look good!
No significant concerns. These imports properly bring in file operations, buffering, and Arrow’sFileWriter
.Also applies to: 29-29, 34-34, 37-38
50-68
: Constructor handles part-file creation gracefully.
Error handling is straightforward; keep in mind to propagate context if future debugging is needed.src/parseable/staging/reader.rs (3)
22-22
: ReplacedStreamReader
withFileReader
.
No issues: switching to file-based reading appears straightforward.Also applies to: 28-28
40-40
: SimplifyingMergedRecordReader::new
can mask partial file read errors.
If one or more files fail to open, they are silently skipped. This might be intentional, but consider returning aResult
or warning about partial successes, so upstream code can handle it.Also applies to: 44-44, 54-54
167-172
: Test coverage for various row scenarios.
Kudos for checking empty, single-row, and multi-row records. The logic ensuresFileReader
is working as expected. No pressing issues here.Also applies to: 180-184, 193-224
src/parseable/streams.rs (5)
43-43
: ImportingMergedRecordReader
,DiskWriter
, and additional tracing levels.
These changes align with the new file-based approach. Tracing additions are helpful for debugging.Also applies to: 60-64
120-129
: Properly creating newDiskWriter
.
Logical approach for on-demand creation of disk writers by schema_key. Check concurrency if multiple schema keys may be created simultaneously.
140-140
: Renamedpath_by_current_time
→path_prefix_by_current_time
.
This clarifies that the returned path is only a prefix. Nicely done.
173-176
: Filtering for.arrows
extension is correct.
No immediate concerns. Please confirm that edge cases (e.g., hidden/system files) won't affect the logic.
536-537
: Use ofMergedRecordReader::new
to build updated schema.
Similar partial-read caveats apply; consider returning any encountered errors.src/query/stream_schema_provider.rs (1)
222-272
: Well-structured integration of time partitioning.The time partition parameter is consistently used throughout the execution plan generation process, with proper error handling and integration with both arrow and parquet file processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/parseable/staging/writer.rs (1)
72-90
:⚠️ Potential issueFix missing count incrementation in write() method.
When
self.count + rb.num_rows() >= N
, the method writes the left slice without updatingself.count
, causing the row limit logic to skip subsequent rotations.Apply this fix:
let left_slice = rb.slice(0, left); self.inner.write(&left_slice)?; + self.count += left_slice.num_rows(); self.finish()?;
🧹 Nitpick comments (3)
src/parseable/staging/writer.rs (3)
39-48
: Document the const parameter N.The struct uses a generic const parameter N for row limits, but its purpose and constraints are not documented.
Add documentation for the const parameter:
/// Context regarding `.arrows` file being persisted onto disk +/// +/// # Type Parameters +/// +/// * `N` - Maximum number of rows allowed in a single Arrow file before rotation pub struct DiskWriter<const N: usize> {
112-116
: Document the Writer struct and its const parameter.The struct lacks documentation explaining its purpose and the significance of the const parameter N.
Add documentation:
#[derive(Default)] +/// Manages both in-memory and on-disk storage of Arrow record batches +/// +/// # Type Parameters +/// +/// * `N` - Maximum number of rows allowed in a single batch before rotation, +/// applies to both memory buffers and disk files pub struct Writer<const N: usize> {
72-90
: Consider extracting common batch slicing logic.The row limit and batch slicing logic is duplicated between
DiskWriter::write()
andMutableBuffer::push()
. Consider extracting this into a shared trait or utility function.Example implementation:
trait BatchSlicer { fn slice_at_limit<const N: usize>(current_count: usize, rb: &RecordBatch) -> Option<(RecordBatch, RecordBatch)> { if current_count + rb.num_rows() >= N { let left = N - current_count; let left_slice = rb.slice(0, left); let right_slice = if left < rb.num_rows() { Some(rb.slice(left, rb.num_rows() - left)) } else { None }; right_slice.map(|right| (left_slice, right)) } else { None } } } impl<const N: usize> BatchSlicer for DiskWriter<N> {} impl<const N: usize> BatchSlicer for MutableBuffer<N> {}Also applies to: 188-212
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/staging/writer.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/parseable/staging/writer.rs (1)
20-38
: LGTM! Imports and constants usage align with file-based operations.The changes appropriately introduce file system and buffered I/O imports, replacing stream-based operations with file-based alternatives. The use of constants for file extensions promotes consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/parseable/staging/writer.rs (1)
70-88
:⚠️ Potential issueFix missing row count increment in the if branch.
When
self.count + rb.num_rows() >= N
, the method writes the left slice without updatingself.count
, causing the row limit logic to skip subsequent rotations.Apply this fix:
let left = N - self.count; let left_slice = rb.slice(0, left); self.inner.write(&left_slice)?; + self.count += left; self.finish()?;
🧹 Nitpick comments (3)
src/parseable/streams.rs (3)
79-79
: Document the significance of 16384.The magic number 16384 used for the Writer's generic parameter should be documented or defined as a named constant to explain its significance.
141-164
: Add documentation for path prefix generation.The function would benefit from documentation explaining:
- Why it was renamed from
path_by_current_time
- The format of the generated path prefix
- The significance of each component in the path
383-389
: Enhance error handling with more context.While the error logging is good, it could be improved by:
- Including the file path in the warning message
- Adding error details to the debug message for better debugging
Apply this diff:
- warn!("Couldn't finish `.arrows` file: {err}"); + warn!("Couldn't finish `.arrows` file at {:?}: {err}", writer.path_prefix); - debug!("Finished `.arrows` file sync onto disk") + debug!("Finished `.arrows` file sync onto disk at {:?}", writer.path_prefix)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/parseable/staging/writer.rs
(1 hunks)src/parseable/streams.rs
(13 hunks)
🧰 Additional context used
🪛 GitHub Actions: Lint, Test and Coverage Report
src/parseable/staging/writer.rs
[error] 118-118: this expression creates a reference which is immediately dereferenced by the compiler. Change this to: self.inner.schema()
.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (4)
src/parseable/staging/writer.rs (3)
20-36
: LGTM! Appropriate imports for file handling.The imports are well-organized and include all necessary components for file I/O operations.
38-47
: LGTM! Well-structured and documented struct.The
DiskWriter
struct is well-designed with clear documentation for each field.
125-129
: LGTM! Clean transition to generic Writer with DiskWriter.The Writer struct is well-adapted to use the new DiskWriter with a configurable row limit.
src/parseable/streams.rs (1)
347-367
: LGTM! Improved record batch handling.The changes correctly:
- Add time partition support
- Fix the issue with discarding memory batches when no disk readers exist
- Combine records from both memory and
.arrows
files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/parseable/staging/writer.rs (1)
63-67
:⚠️ Potential issueAdd row count tracking for proper file rotation.
The
write
method should track the number of rows written to ensure proper file rotation.pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { self.inner.write(rb)?; + self.count += rb.num_rows(); Ok(()) }
🧹 Nitpick comments (1)
src/parseable/staging/mod.rs (1)
33-34
: LGTM! Consider adding documentation.The new
RowLimit
error variant aligns well with the PR's objective of handling multiple arrow files when row limits are breached. The error message is clear and includes the actual limit value.Consider adding documentation to explain:
- The purpose of this error variant
- When it's triggered
- How it relates to the multiple arrow files feature
- What users should do when they encounter this error
#[derive(Debug, thiserror::Error)] pub enum StagingError { + /// Thrown when the number of rows in an arrow file exceeds the configured limit. + /// This triggers the creation of a new arrow file to handle the overflow. + /// The usize parameter represents the configured row limit that was exceeded. #[error("Too many rows: {0}")] RowLimit(usize),
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/parseable/staging/mod.rs
(1 hunks)src/parseable/staging/writer.rs
(3 hunks)src/parseable/streams.rs
(19 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (3)
src/parseable/staging/writer.rs (3)
20-34
: LGTM! The imports and constants align with the PR objectives.The new imports and constants support the transition from stream-based to file-based operations for managing Arrow files.
163-166
: LGTM! The record batch slicing logic is improved.The changes correctly handle the remaining portion of the batch when it exceeds the buffer limit.
176-180
: LGTM! The Writer struct changes align with the PR objectives.The changes successfully:
- Make the row limit configurable using const generics
- Replace stream-based operations with file-based operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
src/parseable/staging/writer.rs (1)
62-66
:⚠️ Potential issueMissing
count
incrementation causes improper file rotation.When writing a record batch, the method doesn't update a count to track the number of rows. This will likely impact the row limit logic implemented elsewhere.
You need to track the number of rows written to properly implement file rotation when row limits are reached.
🧹 Nitpick comments (1)
src/parseable/staging/reader.rs (1)
297-297
: Outdated error message in test code.The error message still references
StreamWriter
even though the code now usesFileWriter
.- let mut writer = FileWriter::try_new(file, schema).expect("Failed to create StreamWriter"); + let mut writer = FileWriter::try_new(file, schema).expect("Failed to create FileWriter");
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/parseable/staging/reader.rs
(7 hunks)src/parseable/staging/writer.rs
(3 hunks)src/parseable/streams.rs
(17 hunks)src/query/stream_schema_provider.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/query/stream_schema_provider.rs
- src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (4)
src/parseable/staging/writer.rs (3)
39-81
: Excellent implementation ofDiskWriter
for better file handling.The new
DiskWriter
struct provides a cleaner, file-based approach for persisting Arrow files to disk. It follows good practices with the use of part files for in-progress writes and atomic rename operations when finishing. Your implementation also properly propagates errors using the?
operator, addressing a previous review comment.
175-179
: Good design choice making Writer generic over row size.Making the
Writer
struct generic over the constantN
allows for compile-time configuration of row limits, which is more efficient than runtime checking. The switch fromStreamWriter
toDiskWriter
in thedisk
field properly implements the PR's objective.
162-165
: Correct handling of partial record batches.This implementation correctly handles the case where only part of a record batch fits within the buffer size limit by slicing the batch and pushing the remainder to the inner buffer.
src/parseable/staging/reader.rs (1)
325-325
: Tests properly updated to use the newMergedRecordReader
API.The test cases have been correctly updated to use the new
new()
constructor and include theNone
parameter fortime_partition
, which aligns with the changes in the method signature.Also applies to: 385-385
The main benefit of this PR is from the maintainability perspective, I have verified it's working is in line with our expectations. |
Fixes #XXXX.
Description
StreamReader
andStreamWriter
withFile
alternatives, ensuring code that persists.arrows
files in staging is simplified..arrows
files) which haven't been converted into parquet yet.This PR has:
Summary by CodeRabbit
New Features
Bug Fixes
Refactor