Skip to content

Commit

Permalink
fix: fix incremental read with invalid/missing end instant
Browse files Browse the repository at this point in the history
1. fix incremental read with invalid/missing end instant

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Jan 24, 2025
1 parent 696683f commit 6f22ae7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))

val startInstantArchived = !queryContext.getArchivedInstants.isEmpty
val endInstantTime = queryContext.getEndInstant.get()
if (queryContext.isEmpty) {
// no commits to read
// scalastyle:off return
return sqlContext.sparkContext.emptyRDD[Row]
// scalastyle:on return
}
val endInstantTime = queryContext.getLastInstant

val scanDf = if (fallbackToFullTableScan && startInstantArchived) {
log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived")
Expand Down Expand Up @@ -265,7 +271,7 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
.load(filteredRegularFullPaths.toList: _*)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn: _*)))
} catch {
case e : AnalysisException =>
case e: AnalysisException =>
if (e.getMessage.contains("Path does not exist")) {
throw new HoodieIncrementalPathNotFoundException(e)
} else {
Expand All @@ -279,6 +285,7 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
}

filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi.functional

import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator, SqlQueryInequalityPreCommitValidator}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
Expand All @@ -32,9 +32,8 @@ import org.apache.hudi.exception.{HoodieUpsertException, HoodieValidationExcepti
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.{DataSourceTestUtils, SparkClientFunctionalTestHarness}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, read}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.functions.{col, lit}
Expand Down Expand Up @@ -71,20 +70,29 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

@ParameterizedTest
@CsvSource(value = Array(
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false"
), delimiter = '|')
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = {
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = {
var options: Map[String, String] = commonOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")

val readOptions = Map(HoodieMetadataConfig.ENABLE.key() -> String.valueOf(isMetadataEnabled),
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> String.valueOf(fileGroupReaderEnabled))

val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
if (isTimestampBasedKeyGen) {
options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
Expand All @@ -108,7 +116,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF1.count())

Expand All @@ -135,7 +143,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.save(basePath)

val snapshotDF2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF2.count())
assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
Expand Down Expand Up @@ -174,7 +182,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// Snapshot Query
val snapshotDF3 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated

Expand All @@ -184,6 +192,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
// Setting HoodieROTablePathFilter here to test whether pathFilter can filter out correctly for IncrementalRelation
spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class", "org.apache.hudi.hadoop.HoodieROTablePathFilter")
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
Expand All @@ -196,6 +205,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// Test incremental query has no instant in range
val emptyIncDF = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, "002")
Expand All @@ -212,6 +222,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// pull the latest commit
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
.load(basePath)
Expand All @@ -223,6 +234,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// pull the latest commit within certain partitions
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, if (isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
Expand All @@ -231,6 +243,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.filter(col("_hoodie_partition_path").startsWith("2016")).count(), hoodieIncViewDF3.count())

val timeTravelDF = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
Expand Down

0 comments on commit 6f22ae7

Please sign in to comment.