diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index b744e4fd87da5..28b7e79f86aea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -285,6 +285,20 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieC } } + /** + * Update the transaction manager lock requirement if necessary. + * + * @param oldLockRequired whether lock was required before. + * @return whether lock was disabled. + */ + protected boolean updateTxnManagerLockRequirementIfNecessary(boolean oldLockRequired) { + boolean shouldDisableLock = oldLockRequired && !config.isLockRequired(); + if (shouldDisableLock) { + txnManager.setIsLockRequired(false); + } + return shouldDisableLock; + } + /** * Updates the cols being indexed with column stats. This is for tracking purpose so that queries can leverage col stats * from MDT only for indexed columns. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index a5beda7c91350..4205f0dd7fc76 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -336,6 +336,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab handleWriteErrors(writeStats, TableServiceType.COMPACT); InstantGenerator instantGenerator = table.getMetaClient().getInstantGenerator(); final HoodieInstant compactionInstant = instantGenerator.getCompactionInflightInstant(compactionCommitTime); + boolean oldLockRequired = txnManager.isLockRequired(); + boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired); try { this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); @@ -346,6 +348,10 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { this.txnManager.endTransaction(Option.of(compactionInstant)); + // Restore the old lock requirement if it was updated. + if (isTxnManagerLockRequirementUpdated) { + txnManager.setIsLockRequired(oldLockRequired); + } releaseResources(compactionCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) @@ -398,6 +404,8 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable handleWriteErrors(writeStats, TableServiceType.LOG_COMPACT); final HoodieInstant logCompactionInstant = table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime); + boolean oldLockRequired = txnManager.isLockRequired(); + boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired); try { this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty()); preCommit(metadata); @@ -409,6 +417,10 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata); } finally { this.txnManager.endTransaction(Option.of(logCompactionInstant)); + // Restore the old lock requirement if it was updated. + if (isTxnManagerLockRequirementUpdated) { + txnManager.setIsLockRequired(oldLockRequired); + } releaseResources(logCompactionCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime) @@ -528,6 +540,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, handleWriteErrors(writeStats, TableServiceType.CLUSTER); final HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime, table.getActiveTimeline(), table.getMetaClient().getInstantGenerator()).get(); + boolean oldLockRequired = txnManager.isLockRequired(); + boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired); try { this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); @@ -549,6 +563,10 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } finally { this.txnManager.endTransaction(Option.of(clusteringInstant)); + // Restore the old lock requirement if it was updated. + if (isTxnManagerLockRequirementUpdated) { + txnManager.setIsLockRequired(oldLockRequired); + } releaseResources(clusteringCommitTime); } WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) @@ -633,12 +651,18 @@ public Option scheduleTableService(String instantTime, Option inflightInstant = Option.of(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, tableServiceType.getAction(), instantTime)); + boolean oldLockRequired = txnManager.isLockRequired(); + boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired); try { this.txnManager.beginTransaction(inflightInstant, Option.empty()); LOG.info("Scheduling table service {} for table {}", tableServiceType, config.getBasePath()); return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType); } finally { this.txnManager.endTransaction(inflightInstant); + // Restore the old lock requirement if it was updated. + if (isTxnManagerLockRequirementUpdated) { + txnManager.setIsLockRequired(oldLockRequired); + } } } @@ -967,7 +991,7 @@ protected Boolean rollbackFailedWrites() { } protected void rollbackFailedWrites(Map> instantsToRollback) { - rollbackFailedWrites(instantsToRollback, false, false); + rollbackFailedWrites(instantsToRollback, !config.isLockRequired(), false); } protected void rollbackFailedWrites(Map> instantsToRollback, boolean skipLocking, boolean skipVersionCheck) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 82d34c5515d2b..583925795c972 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -237,6 +237,8 @@ public boolean commitStats(String instantTime, List stats, extraMetadata, operationType, config.getWriteSchema(), commitActionType); HoodieInstant inflightInstant = table.getMetaClient().createNewInstant(State.INFLIGHT, commitActionType, instantTime); HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config); + boolean oldLockRequired = txnManager.isLockRequired(); + boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired); this.txnManager.beginTransaction(Option.of(inflightInstant), lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); try { @@ -251,6 +253,10 @@ public boolean commitStats(String instantTime, List stats, throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e); } finally { this.txnManager.endTransaction(Option.of(inflightInstant)); + // Restore the old lock requirement if it was updated + if (isTxnManagerLockRequirementUpdated) { + txnManager.setIsLockRequired(oldLockRequired); + } releaseResources(instantTime); } @@ -994,6 +1000,17 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC } } + /** + * Schedules a new compaction instant. + * + * @param extraMetadata Extra Metadata to be stored + * @param shouldLock whether to lock for time generation + */ + public Option scheduleCompaction(Option> extraMetadata, boolean shouldLock) throws HoodieIOException { + String instantTime = createNewInstantTime(shouldLock); + return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty(); + } + /** * Schedules a new compaction instant. * @param extraMetadata Extra Metadata to be stored @@ -1308,11 +1325,17 @@ protected void doInitTable(WriteOperationType operationType, HoodieTableMetaClie } private void executeUsingTxnManager(Option ownerInstant, Runnable r) { + boolean oldLockRequired = txnManager.isLockRequired(); + boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired); this.txnManager.beginTransaction(ownerInstant, Option.empty()); try { r.run(); } finally { this.txnManager.endTransaction(ownerInstant); + // Restore the old lock requirement if it was updated + if (isTxnManagerLockRequirementUpdated) { + txnManager.setIsLockRequired(oldLockRequired); + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index b725243fca0ad..b1610ebefcc3d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -37,7 +37,7 @@ public class TransactionManager implements Serializable { protected static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class); protected final LockManager lockManager; - protected final boolean isLockRequired; + protected boolean isLockRequired; protected Option currentTxnOwnerInstant = Option.empty(); private Option lastCompletedTxnOwnerInstant = Option.empty(); @@ -105,4 +105,12 @@ public Option getCurrentTransactionOwner() { public boolean isLockRequired() { return isLockRequired; } + + /** + * Lock requirement for transaction manager can change from within a transaction. + * For example, when a compaction is scheduled during upgrade, the lock requirement is set to false. + */ + public void setIsLockRequired(boolean isLockRequired) { + this.isLockRequired = isLockRequired; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 85a3a1f6bc07e..6a8817a81cd84 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1242,7 +1242,8 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) { this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build(); this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build(); this.timeGeneratorConfig = HoodieTimeGeneratorConfig.newBuilder().fromProperties(props) - .withDefaultLockProvider(!isLockRequired()).build(); + // Instant time generation needs a lock. If lock provider is not set, use default lock provider + .withDefaultLockProvider(!isLockProviderSet()).build(); this.indexingConfig = HoodieIndexingConfig.newBuilder().fromProperties(props).build(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java index e5618d69a359b..44aac112852cb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java @@ -191,7 +191,7 @@ static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContex try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) { writeClient.rollbackFailedWrites(); if (shouldCompact) { - Option compactionInstantOpt = writeClient.scheduleCompaction(Option.empty()); + Option compactionInstantOpt = writeClient.scheduleCompaction(Option.empty(), writeClient.getConfig().isLockRequired()); if (compactionInstantOpt.isPresent()) { writeClient.compact(compactionInstantOpt.get()); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala index 862cd4f9ae904..4c4d68bae1350 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala @@ -19,9 +19,11 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.client.transaction.lock.InProcessLockProvider import org.apache.hudi.common.config.RecordMergeMode import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} +import org.apache.hudi.config.HoodieLockConfig import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade} import org.apache.spark.sql.SaveMode @@ -43,6 +45,7 @@ class TestSevenToEightUpgrade extends RecordLevelIndexTestBase { "hoodie.metadata.enable" -> "false", // "OverwriteWithLatestAvroPayload" is used to trigger merge mode upgrade/downgrade. DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName, + HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> classOf[InProcessLockProvider].getName, DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name) doWriteAndValidateDataAndRecordIndex(hudiOpts,