Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8890] fix incremental read with invalid/missing end instant #12679

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))

val startInstantArchived = !queryContext.getArchivedInstants.isEmpty
val endInstantTime = queryContext.getEndInstant.get()
if (queryContext.isEmpty) {
// no commits to read
// scalastyle:off return
return sqlContext.sparkContext.emptyRDD[Row]
// scalastyle:on return
}
val endInstantTime = queryContext.getLastInstant

val scanDf = if (fallbackToFullTableScan && startInstantArchived) {
log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived")
Expand Down Expand Up @@ -265,7 +271,7 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
.load(filteredRegularFullPaths.toList: _*)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn: _*)))
} catch {
case e : AnalysisException =>
case e: AnalysisException =>
if (e.getMessage.contains("Path does not exist")) {
throw new HoodieIncrementalPathNotFoundException(e)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi.functional

import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator, SqlQueryInequalityPreCommitValidator}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
Expand All @@ -32,9 +32,8 @@ import org.apache.hudi.exception.{HoodieUpsertException, HoodieValidationExcepti
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.{DataSourceTestUtils, SparkClientFunctionalTestHarness}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, read}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.functions.{col, lit}
Expand Down Expand Up @@ -71,20 +70,29 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

@ParameterizedTest
@CsvSource(value = Array(
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false"
), delimiter = '|')
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = {
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = {
var options: Map[String, String] = commonOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")

val readOptions = Map(HoodieMetadataConfig.ENABLE.key() -> String.valueOf(isMetadataEnabled),
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> String.valueOf(fileGroupReaderEnabled))

val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
if (isTimestampBasedKeyGen) {
options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
Expand All @@ -108,7 +116,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF1.count())

Expand All @@ -135,7 +143,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.save(basePath)

val snapshotDF2 = spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF2.count())
assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
Expand Down Expand Up @@ -174,7 +182,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// Snapshot Query
val snapshotDF3 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated

Expand All @@ -184,6 +192,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
// Setting HoodieROTablePathFilter here to test whether pathFilter can filter out correctly for IncrementalRelation
spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class", "org.apache.hudi.hadoop.HoodieROTablePathFilter")
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
Expand All @@ -196,6 +205,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// Test incremental query has no instant in range
val emptyIncDF = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, "002")
Expand All @@ -212,6 +222,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// pull the latest commit
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
.load(basePath)
Expand All @@ -223,6 +234,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {

// pull the latest commit within certain partitions
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, if (isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
Expand All @@ -231,6 +243,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.filter(col("_hoodie_partition_path").startsWith("2016")).count(), hoodieIncViewDF3.count())

val timeTravelDF = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
Expand Down
Loading