diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 85a3a1f6bc07e..ddc0aa8bc46f1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -3496,7 +3496,7 @@ private boolean isLockRequiredForSingleWriter() { private void autoAdjustConfigsForConcurrencyMode(boolean isLockProviderPropertySet) { // for a single writer scenario, with all table services inline, lets set InProcessLockProvider - if (writeConfig.getWriteConcurrencyMode() == WriteConcurrencyMode.SINGLE_WRITER && !writeConfig.areAnyTableServicesAsync()) { + if (writeConfig.isAutoAdjustLockConfigs() && writeConfig.getWriteConcurrencyMode() == WriteConcurrencyMode.SINGLE_WRITER && !writeConfig.areAnyTableServicesAsync()) { if (writeConfig.getLockProviderClass() != null && !writeConfig.getLockProviderClass().equals(InProcessLockProvider.class.getCanonicalName())) { // add logs only when explicitly overridden by the user. LOG.warn(String.format("For a single writer mode, overriding lock provider class (%s) to %s. So, user configured lock provider %s may not take effect", diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java index e5618d69a359b..08a0ce4b1d24a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -166,9 +167,17 @@ static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContex // set required configs for rollback HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone()); // NOTE: at this stage rollback should use the current writer version and disable auto upgrade/downgrade - TypedProperties properties = config.getProps(); - properties.remove(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()); - properties.remove(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key()); + TypedProperties properties = new TypedProperties(); + properties.putAll(config.getProps()); + // TimeGeneratos are cached and re-used based on table base path. Since here we are changing the lock configurations, avoiding the cache use + // for upgrade code block. + properties.put(HoodieTimeGeneratorConfig.TIME_GENERATOR_REUSE_ENABLE.key(),"false"); + // override w/ NoopLock Provider to avoid re-entrant locking. already upgrade is happening within the table level lock. + // Belew we do trigger rollback and compaction which might again try to acquire the lock. So, here we are explicitly overriding to + // NoopLockProvider for just the upgrade code block. + properties.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),"org.apache.hudi.client.transaction.lock.NoopLockProvider"); + // if auto adjust it not disabled, chances that InProcessLockProvider will get overriden for single writer use-cases. + properties.put("hoodie.auto.adjust.lock.configs","false"); HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder() .withProps(properties) .withWriteTableVersion(tableVersion.versionCode()) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 69a48e2fd7b97..cb46cec824269 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.client.transaction.lock.NoopLockProvider; import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; @@ -529,6 +530,42 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName()); } + @Test + public void testTimeGeneratorConfig() { + + HoodieWriteConfig writeConfig = createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); + } + }); + + // validate the InProcessLockProvider kicks in if no explicit lock provider is set. + assertEquals(InProcessLockProvider.class.getName(), writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())); + + writeConfig = createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); + put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), NoopLockProvider.class.getName()); + } + }); + + // validate the the configured lock provider is honored by the TimeGeneratorConfig as well. + assertEquals(NoopLockProvider.class.getName(), writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())); + + // if auto adjust lock config is enabled, for a single writer w/ all inline table services, InProcessLockProvider is overriden + writeConfig = createWriteConfig(new HashMap() { + { + put(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.name()); + put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), NoopLockProvider.class.getName()); + put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true"); + } + }); + + // validate the InProcessLockProvider kicks in due to auto adjust lock configs + assertEquals(InProcessLockProvider.class.getName(), writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key())); + + } + @Test public void testConsistentBucketIndexDefaultClusteringConfig() { Properties props = new Properties(); diff --git a/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java new file mode 100644 index 0000000000000..65c6aa32b32e2 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.storage.StorageConfiguration; + +import org.jetbrains.annotations.NotNull; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * NoopLockProvider as the name suggests, is a no op lock provider. Any caller asking for a lock will be able to get hold of the lock. + * This is not meant to be used a producation grade lock providers. This is meant to be used for Hudi's internal operations. + * For eg: During upgrade, we have nested lock situations and we leverage this {@code NoopLockProvider} for any operations we + * might want to do within the upgradeHandler blocks to avoid re-entrant situations. Not all lock providers might support re-entrancy and during upgrade, + * it is expected to have a single writer to the Hudi table of interest. + */ +public class NoopLockProvider implements LockProvider, Serializable { + + public NoopLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration conf) { + // no op. + } + + @Override + public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException { + return true; + } + + @Override + public void unlock() { + // no op. + } + + @Override + public void lockInterruptibly() { + // no op. + } + + @Override + public void lock() { + // no op. + } + + @Override + public boolean tryLock() { + return true; + } + + @Override + public ReentrantReadWriteLock getLock() { + return new ReentrantReadWriteLock(); + } + + @Override + public String getCurrentOwnerLockInfo() { + return StringUtils.EMPTY_STRING; + } + + @Override + public void close() { + // no op. + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java index d48a9d0d8bcd3..16ebff9135b83 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java @@ -56,6 +56,15 @@ public class HoodieTimeGeneratorConfig extends HoodieConfig { .withDocumentation("The max expected clock skew time in ms between two processes generating time. Used by " + TimeGeneratorType.WAIT_TO_ADJUST_SKEW.name() + " time generator to implement TrueTime semantics."); + public static final ConfigProperty TIME_GENERATOR_REUSE_ENABLE = ConfigProperty + .key("_hoodie.time.generator.reuse.enable") + .defaultValue(true) + .sinceVersion("1.0.1") + .markAdvanced() + .withDocumentation("Used only for internal purposes. TimeGeneratos are cached per table base path and re-used across invocations. " + + "For some internal purposes, we wanted to avoid using the cached TimeGenerator (like upgrade flows). Hence this config " + + "will be used internally during upgrade flow. No advisable for end user to use this config. "); + private HoodieTimeGeneratorConfig() { super(); } @@ -68,6 +77,10 @@ public long getMaxExpectedClockSkewMs() { return getLong(MAX_EXPECTED_CLOCK_SKEW_MS); } + public boolean canReuseTimeGenerator() { + return getBoolean(TIME_GENERATOR_REUSE_ENABLE); + } + public String getBasePath() { return getString(BASE_PATH); } @@ -108,6 +121,11 @@ public Builder withMaxExpectedClockSkewMs(long skewMs) { return this; } + public Builder withReuseTimeGenerator(boolean reuseTimeGenerator) { + timeGeneratorConfig.setValue(TIME_GENERATOR_REUSE_ENABLE, String.valueOf(reuseTimeGenerator)); + return this; + } + public Builder withPath(String basePath) { timeGeneratorConfig.setValue(BASE_PATH, basePath); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java index 2dce07dc26717..b410af6f453d6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java @@ -42,12 +42,20 @@ public static TimeGenerator getTimeGenerator(HoodieTimeGeneratorConfig timeGener StorageConfiguration storageConf) { ValidationUtils.checkState(timeGeneratorConfig.contains(BASE_PATH), "Option [" + BASE_PATH.key() + "] is required"); ValidationUtils.checkArgument(storageConf != null, "Hadoop configuration is required"); - return TIME_GENERATOR_CACHE.get(timeGeneratorConfig.getBasePath(), s -> { - TimeGeneratorType type = timeGeneratorConfig.getTimeGeneratorType(); - if (Objects.requireNonNull(type) == TimeGeneratorType.WAIT_TO_ADJUST_SKEW) { - return new SkewAdjustingTimeGenerator(timeGeneratorConfig, storageConf); - } - throw new IllegalArgumentException("Unsupported TimeGenerator Type " + type); - }); + if (timeGeneratorConfig.canReuseTimeGenerator()) { + return TIME_GENERATOR_CACHE.get(timeGeneratorConfig.getBasePath(), s -> getNewTimeGenerator(timeGeneratorConfig, storageConf)); + } else { + return getNewTimeGenerator(timeGeneratorConfig, storageConf); + } + } + + private static TimeGenerator getNewTimeGenerator(HoodieTimeGeneratorConfig timeGeneratorConfig, + StorageConfiguration storageConf) { + // reuse is set to false. + TimeGeneratorType type = timeGeneratorConfig.getTimeGeneratorType(); + if (Objects.requireNonNull(type) == TimeGeneratorType.WAIT_TO_ADJUST_SKEW) { + return new SkewAdjustingTimeGenerator(timeGeneratorConfig, storageConf); + } + throw new IllegalArgumentException("Unsupported TimeGenerator Type " + type); } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java new file mode 100644 index 0000000000000..e27da253eb7c2 --- /dev/null +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.storage.StorageConfiguration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +/** + * Tests {@code NoopLockProvider}. + */ +public class TestNoopLockProvider { + + private static final Logger LOG = LoggerFactory.getLogger(TestNoopLockProvider.class); + private final StorageConfiguration storageConf = getDefaultStorageConf(); + private final LockConfiguration lockConfiguration1; + private final LockConfiguration lockConfiguration2; + + public TestNoopLockProvider() { + TypedProperties properties = new TypedProperties(); + properties.put(HoodieCommonConfig.BASE_PATH.key(), "table1"); + lockConfiguration1 = new LockConfiguration(properties); + properties.put(HoodieCommonConfig.BASE_PATH.key(), "table2"); + lockConfiguration2 = new LockConfiguration(properties); + } + + @Test + public void testLockAcquisition() { + NoopLockProvider noopLockProvider = new NoopLockProvider(lockConfiguration1, storageConf); + assertDoesNotThrow(() -> { + noopLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider.unlock(); + }); + } + + @Test + public void testLockReAcquisitionBySameThread() { + NoopLockProvider noopLockProvider = new NoopLockProvider(lockConfiguration1, storageConf); + assertDoesNotThrow(() -> { + noopLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider.unlock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider.lock(); + }); + } + + @Test + public void testLockReAcquisitionBySameThreadWithTwoTables() { + NoopLockProvider noopLockProvider1 = new NoopLockProvider(lockConfiguration1, storageConf); + NoopLockProvider noopLockProvider2 = new NoopLockProvider(lockConfiguration2, storageConf); + + assertDoesNotThrow(() -> { + noopLockProvider1.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider2.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider1.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider1.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider1.unlock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider2.unlock(); + }); + } + + @Test + public void testLockReAcquisitionByDifferentThread() { + NoopLockProvider noopLockProvider = new NoopLockProvider(lockConfiguration1, storageConf); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + + // Main test thread + assertDoesNotThrow(() -> { + noopLockProvider.lock(); + }); + + // Another writer thread in parallel, should be able to acquire the lock instantly + Thread writer2 = new Thread(new Runnable() { + @Override + public void run() { + assertDoesNotThrow(() -> { + noopLockProvider.lock(); + }); + assertDoesNotThrow(() -> { + noopLockProvider.unlock(); + }); + writer2Completed.set(true); + } + }); + writer2.start(); + + assertDoesNotThrow(() -> { + noopLockProvider.unlock(); + }); + + try { + writer2.join(); + } catch (InterruptedException e) { + // + } + Assertions.assertTrue(writer2Completed.get()); + + writer2.interrupt(); + } +} diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java index f6e92943cdf44..d776229e63e22 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + public class TestWaitBasedTimeGenerator { public static class MockInProcessLockProvider extends InProcessLockProvider { @@ -145,4 +149,29 @@ public void testSlowerThreadLaterAcquiredLock(boolean slowerThreadAcquiredLockLa Assertions.assertTrue(t2Timestamp.get() < t1Timestamp.get()); } } + + @Test + public void testTimeGeneratorCache() { + TimeGenerator timeGenerator1 = TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf); + TimeGenerator timeGenerator2 = TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf); + TimeGenerator timeGenerator3 = TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf); + + assertEquals(timeGenerator1, timeGenerator2); + assertEquals(timeGenerator1, timeGenerator3); + + // disable reuse + HoodieTimeGeneratorConfig timeGeneratorConfigWithNoReuse = HoodieTimeGeneratorConfig.newBuilder() + .withPath("test_wait_based") + .withMaxExpectedClockSkewMs(25L) + .withReuseTimeGenerator(false) + .withTimeGeneratorType(TimeGeneratorType.WAIT_TO_ADJUST_SKEW) + .build(); + + TimeGenerator timeGenerator4 = TimeGenerators.getTimeGenerator(timeGeneratorConfigWithNoReuse, storageConf); + assertNotEquals(timeGenerator1, timeGenerator4); + // how many ever times we call, we should get new time generator + TimeGenerator timeGenerator5 = TimeGenerators.getTimeGenerator(timeGeneratorConfigWithNoReuse, storageConf); + assertNotEquals(timeGenerator4, timeGenerator5); + } + } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala index de2b418a7a989..34d4ab8f07671 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala @@ -19,34 +19,48 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions -import org.apache.hudi.client.transaction.lock.InProcessLockProvider +import org.apache.hudi.client.transaction.lock.{InProcessLockProvider, NoopLockProvider} import org.apache.hudi.common.config.RecordMergeMode import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} import org.apache.hudi.config.HoodieLockConfig +import org.apache.hudi.functional.TestSevenToEightUpgrade.{InProcessLockProviderClass, NoOpLockProviderClass} import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade} import org.apache.spark.sql.SaveMode import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.EnumSource +import org.junit.jupiter.params.provider.{CsvSource, EnumSource} class TestSevenToEightUpgrade extends RecordLevelIndexTestBase { @ParameterizedTest - @EnumSource(classOf[HoodieTableType]) - def testPartitionFieldsWithUpgrade(tableType: HoodieTableType): Unit = { + @CsvSource(value = Array( + "COPY_ON_WRITE,null", + "COPY_ON_WRITE,org.apache.hudi.client.transaction.lock.InProcessLockProvider", + "COPY_ON_WRITE,org.apache.hudi.client.transaction.lock.NoopLockProvider", + "MERGE_ON_READ,null", + "MERGE_ON_READ,org.apache.hudi.client.transaction.lock.InProcessLockProvider", + "MERGE_ON_READ,org.apache.hudi.client.transaction.lock.NoopLockProvider" + )) + def testPartitionFieldsWithUpgrade(tableType: HoodieTableType, lockProviderClass: String): Unit = { val partitionFields = "partition:simple" // Downgrade handling for metadata not yet ready. - val hudiOpts = commonOpts ++ Map( + val hudiOptsWithoutLockConfigs = commonOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> KeyGeneratorType.CUSTOM.getClassName, DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields, "hoodie.metadata.enable" -> "false", // "OverwriteWithLatestAvroPayload" is used to trigger merge mode upgrade/downgrade. DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName, - DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name, - HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> classOf[InProcessLockProvider].getName) + DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name) + + val hudiOpts = if (!lockProviderClass.equals("null")) { + hudiOptsWithoutLockConfigs ++ Map(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> lockProviderClass) + } else { + hudiOptsWithoutLockConfigs + } doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, @@ -91,3 +105,9 @@ class TestSevenToEightUpgrade extends RecordLevelIndexTestBase { } } } + +object TestSevenToEightUpgrade{ + val InProcessLockProviderClass = classOf[InProcessLockProvider].getName + val NoOpLockProviderClass = classOf[NoopLockProvider].getName + +} \ No newline at end of file