From 8021816edcaf63a26aa65556a090e908c6bcdeaf Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 21 Jan 2025 12:03:42 +0800 Subject: [PATCH] fix: fix incremental read with invalid/missing end instant 1. fix incremental read with invalid/missing end instant Signed-off-by: TheR1sing3un --- .../apache/hudi/IncrementalRelationV2.scala | 13 +++++-- .../functional/TestCOWDataSourceStorage.scala | 39 ++++++++++++------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala index 08423c226c20f..d6da73e4b657e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala @@ -207,7 +207,13 @@ class IncrementalRelationV2(val sqlContext: SQLContext, val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) val startInstantArchived = !queryContext.getArchivedInstants.isEmpty - val endInstantTime = queryContext.getEndInstant.get() + if (queryContext.isEmpty) { + // no commits to read + // scalastyle:off return + return sqlContext.sparkContext.emptyRDD[Row] + // scalastyle:on return + } + val endInstantTime = queryContext.getLastInstant val scanDf = if (fallbackToFullTableScan && startInstantArchived) { log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived") @@ -265,7 +271,7 @@ class IncrementalRelationV2(val sqlContext: SQLContext, .load(filteredRegularFullPaths.toList: _*) .filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn: _*))) } catch { - case e : AnalysisException => + case e: AnalysisException => if (e.getMessage.contains("Path does not exist")) { throw new HoodieIncrementalPathNotFoundException(e) } else { @@ -278,7 +284,8 @@ class IncrementalRelationV2(val sqlContext: SQLContext, } } - filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd + filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index f5d42fc87ac67..a0bea3453bd66 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -21,7 +21,7 @@ package org.apache.hudi.functional import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator, SqlQueryInequalityPreCommitValidator} -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig} import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD} import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} @@ -32,9 +32,8 @@ import org.apache.hudi.exception.{HoodieUpsertException, HoodieValidationExcepti import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.testutils.{DataSourceTestUtils, SparkClientFunctionalTestHarness} -import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient +import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, read} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf - import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.functions.{col, lit} @@ -71,20 +70,29 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { @ParameterizedTest @CsvSource(value = Array( - "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key", - "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency", - "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key", - "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key", - "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency", - "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key" + "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true", + "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true", + "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true", + "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true", + "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true", + "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true", + "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false", + "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false", + "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false", + "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false", + "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false", + "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false" ), delimiter = '|') - def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = { + def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = { var options: Map[String, String] = commonOpts ++ Map( HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled), DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass, DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys, HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true") + val readOptions = Map(HoodieMetadataConfig.ENABLE.key() -> String.valueOf(isMetadataEnabled), + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> String.valueOf(fileGroupReaderEnabled)) + val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass) if (isTimestampBasedKeyGen) { options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key" @@ -108,7 +116,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // Snapshot query val snapshotDF1 = spark.read.format("org.apache.hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(readOptions) .load(basePath) assertEquals(100, snapshotDF1.count()) @@ -135,7 +143,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { .save(basePath) val snapshotDF2 = spark.read.format("hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(readOptions) .load(basePath) assertEquals(100, snapshotDF2.count()) assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) @@ -174,7 +182,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // Snapshot Query val snapshotDF3 = spark.read.format("org.apache.hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(readOptions) .load(basePath) assertEquals(100, snapshotDF3.count()) // still 100, since we only updated @@ -184,6 +192,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // Setting HoodieROTablePathFilter here to test whether pathFilter can filter out correctly for IncrementalRelation spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class", "org.apache.hudi.hadoop.HoodieROTablePathFilter") val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") + .options(readOptions) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.START_COMMIT.key, completionTime1) .option(DataSourceReadOptions.END_COMMIT.key, completionTime1) @@ -196,6 +205,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // Test incremental query has no instant in range val emptyIncDF = spark.read.format("org.apache.hudi") + .options(readOptions) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.START_COMMIT.key, "000") .option(DataSourceReadOptions.END_COMMIT.key, "002") @@ -212,6 +222,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // pull the latest commit val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") + .options(readOptions) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.START_COMMIT.key, completionTime3) .load(basePath) @@ -223,6 +234,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // pull the latest commit within certain partitions val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") + .options(readOptions) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.START_COMMIT.key, completionTime3) .option(DataSourceReadOptions.INCR_PATH_GLOB.key, if (isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*") @@ -231,6 +243,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { .filter(col("_hoodie_partition_path").startsWith("2016")).count(), hoodieIncViewDF3.count()) val timeTravelDF = spark.read.format("org.apache.hudi") + .options(readOptions) .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.START_COMMIT.key, completionTime1) .option(DataSourceReadOptions.END_COMMIT.key, completionTime1)