diff --git a/README.md b/README.md index 2bc0fa8..e639ae4 100644 --- a/README.md +++ b/README.md @@ -51,41 +51,162 @@ users and projects. | **PyPi** | `pip install hudi` | | **Crates.io** | `cargo add hudi` | -## Example usage +## Usage Examples > [!NOTE] > These examples expect a Hudi table exists at `/tmp/trips_table`, created using > the [quick start guide](https://hudi.apache.org/docs/quick-start-guide). -### Python +### Snapshot Query -Read a Hudi table into a PyArrow table. +Snapshot query reads the latest version of the data from the table. The table API also accepts partition filters. + +#### Python ```python from hudi import HudiTableBuilder import pyarrow as pa +hudi_table = HudiTableBuilder.from_base_uri("/tmp/trips_table").build() +batches = hudi_table.read_snapshot(filters=[("city", "=", "san_francisco")]) + +# convert to PyArrow table +arrow_table = pa.Table.from_batches(batches) +result = arrow_table.select(["rider", "city", "ts", "fare"]) +print(result) +``` + +#### Rust + +```rust +use hudi::error::Result; +use hudi::table::builder::TableBuilder as HudiTableBuilder; +use arrow::compute::concat_batches; + +#[tokio::main] +async fn main() -> Result<()> { + let hudi_table = HudiTableBuilder::from_base_uri("/tmp/trips_table").build().await?; + let batches = hudi_table.read_snapshot(&[("city", "=", "san_francisco")]).await?; + let batch = concat_batches(&batches[0].schema(), &batches)?; + let columns = vec!["rider", "city", "ts", "fare"]; + for col_name in columns { + let idx = batch.schema().index_of(col_name).unwrap(); + println!("{}: {}", col_name, batch.column(idx)); + } + Ok(()) +} +``` + +To run read-optimized (RO) query on Merge-on-Read (MOR) tables, set `hoodie.read.use.read_optimized.mode` when creating the table. + +#### Python + +```python hudi_table = ( HudiTableBuilder .from_base_uri("/tmp/trips_table") - .with_option("hoodie.read.as.of.timestamp", "20241122010827898") + .with_option("hoodie.read.use.read_optimized.mode", "true") .build() ) -records = hudi_table.read_snapshot(filters=[("city", "=", "san_francisco")]) +``` -arrow_table = pa.Table.from_batches(records) -result = arrow_table.select(["rider", "city", "ts", "fare"]) -print(result) +#### Rust + +```rust +let hudi_table = + HudiTableBuilder::from_base_uri("/tmp/trips_table") + .with_option("hoodie.read.use.read_optimized.mode", "true") + .build().await?; +``` + +> [!NOTE] +> Currently reading MOR tables is limited to tables with Parquet data blocks. + +### Time-Travel Query + +Time-travel query reads the data at a specific timestamp from the table. The table API also accepts partition filters. + +#### Python + +```python +batches = ( + hudi_table + .read_snapshot_as_of("20241231123456789", filters=[("city", "=", "san_francisco")]) +) +``` + +#### Rust + +```rust +let batches = + hudi_table + .read_snapshot_as_of("20241231123456789", &[("city", "=", "san_francisco")]).await?; ``` -### Rust (DataFusion) +### Incremental Query + +Incremental query reads the changed data from the table for a given time range. + +#### Python + +```python +# read the records between t1 (exclusive) and t2 (inclusive) +batches = hudi_table.read_incremental_records(t1, t2) + +# read the records after t1 +batches = hudi_table.read_incremental_records(t1) +``` + +#### Rust + +```rust +// read the records between t1 (exclusive) and t2 (inclusive) +let batches = hudi_table.read_incremental_records(t1, Some(t2)).await?; + +// read the records after t1 +let batches = hudi_table.read_incremental_records(t1, None).await?; +``` + +> [!NOTE] +> Currently the only supported format for the timestamp arguments is Hudi Timeline format: `yyyyMMddHHmmssSSS` or `yyyyMMddHHmmss`. + +## Query Engine Integration + +Hudi-rs provides APIs to support integration with query engines. The sections below highlight some commonly used APIs. + +### Table API + +Create a Hudi table instance using its constructor or the `TableBuilder` API. + +| Stage | API | Description | +|-----------------|-------------------------------------------|--------------------------------------------------------------------------------| +| Query planning | `get_file_slices()` | For snapshot query, get a list of file slices. | +| | `get_file_slices_splits()` | For snapshot query, get a list of file slices in splits. | +| | `get_file_slices_as_of()` | For time-travel query, get a list of file slices at a given time. | +| | `get_file_slices_splits_as_of()` | For time-travel query, get a list of file slices in splits at a given time. | +| | `get_file_slices_between()` | For incremental query, get a list of changed file slices between a time range. | +| Query execution | `create_file_group_reader_with_options()` | Create a file group reader instance with the table instance's configs. | + +### File Group API + +Create a Hudi file group reader instance using its constructor or the Hudi table API `create_file_group_reader_with_options()`. + +| Stage | API | Description | +|-----------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Query execution | `read_file_slice()` | Read records from a given file slice; based on the configs, read records from only base file, or from base file and log files, and merge records based on the configured strategy. | + + +### Apache DataFusion + +Enabling the `hudi` crate with `datafusion` feature will provide a [DataFusion](https://datafusion.apache.org/) +extension to query Hudi tables.
Add crate hudi with datafusion feature to your application to query a Hudi table. ```shell cargo new my_project --bin && cd my_project -cargo add tokio@1 datafusion@42 +cargo add tokio@1 datafusion@43 cargo add hudi --features datafusion ``` @@ -105,7 +226,7 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); let hudi = HudiDataSource::new_with_options( "/tmp/trips_table", - [("hoodie.read.as.of.timestamp", "20241122010827898")]).await?; + [("hoodie.read.input.partitions", "5")]).await?; ctx.register_table("trips_table", Arc::new(hudi))?; let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?; df.show().await?; @@ -113,15 +234,22 @@ async fn main() -> Result<()> { } ``` +### Other Integrations + +Hudi is also integrated with + +- [Daft](https://www.getdaft.io/projects/docs/en/stable/user_guide/integrations/hudi.html) +- [Ray](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_hudi.html#ray.data.read_hudi) + ### Work with cloud storage Ensure cloud storage credentials are set properly as environment variables, e.g., `AWS_*`, `AZURE_*`, or `GOOGLE_*`. Relevant storage environment variables will then be picked up. The target table's base uri with schemes such as `s3://`, `az://`, or `gs://` will be processed accordingly. -Alternatively, you can pass the storage configuration as options to the `HudiTableBuilder` or `HudiDataSource`. +Alternatively, you can pass the storage configuration as options via Table APIs. -### Python +#### Python ```python from hudi import HudiTableBuilder @@ -134,18 +262,17 @@ hudi_table = ( ) ``` -### Rust (DataFusion) +#### Rust ```rust -use hudi::HudiDataSource; +use hudi::table::builder::TableBuilder as HudiTableBuilder; async fn main() -> Result<()> { - let hudi = HudiDataSource::new_with_options( - "s3://bucket/trips_table", - [("aws_region", "us-west-2")] - ).await?; + let hudi_table = + HudiTableBuilder::from_base_uri("s3://bucket/trips_table") + .with_option("aws_region", "us-west-2") + .build().await?; } - ``` ## Contributing diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 82f6243..8d6a137 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -110,7 +110,7 @@ use std::str::FromStr; use std::sync::Arc; use url::Url; -/// Hudi Table in-memory +/// The main struct that provides table APIs for interacting with a Hudi table. #[derive(Clone, Debug)] pub struct Table { pub hudi_configs: Arc, diff --git a/demo/README.md b/demo/README.md new file mode 100644 index 0000000..fa22e2f --- /dev/null +++ b/demo/README.md @@ -0,0 +1,26 @@ + + +# Demo + +The demo runs on docker compose; the infrastructure is defined in [`compose.yaml`](compose.yaml) and [`infra`](infra). + +There are multiple independent demo apps residing in directories like [`table-api-python`](table-api-python). + +The demo apps are also used for integration tests.