Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8930] Set transaction manager lock requirement based on config #12733

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,20 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieC
}
}

/**
* Update the transaction manager lock requirement if necessary.
*
* @param oldLockRequired whether lock was required before.
* @return whether lock was disabled.
*/
protected boolean updateTxnManagerLockRequirementIfNecessary(boolean oldLockRequired) {
boolean shouldDisableLock = oldLockRequired && !config.isLockRequired();
if (shouldDisableLock) {
txnManager.setIsLockRequired(false);
}
return shouldDisableLock;
}

/**
* Updates the cols being indexed with column stats. This is for tracking purpose so that queries can leverage col stats
* from MDT only for indexed columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
handleWriteErrors(writeStats, TableServiceType.COMPACT);
InstantGenerator instantGenerator = table.getMetaClient().getInstantGenerator();
final HoodieInstant compactionInstant = instantGenerator.getCompactionInflightInstant(compactionCommitTime);
boolean oldLockRequired = txnManager.isLockRequired();
boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired);
try {
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
Expand All @@ -346,6 +348,10 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
this.txnManager.endTransaction(Option.of(compactionInstant));
// Restore the old lock requirement if it was updated.
if (isTxnManagerLockRequirementUpdated) {
txnManager.setIsLockRequired(oldLockRequired);
}
releaseResources(compactionCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
Expand Down Expand Up @@ -398,6 +404,8 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
handleWriteErrors(writeStats, TableServiceType.LOG_COMPACT);
final HoodieInstant logCompactionInstant = table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION,
logCompactionCommitTime);
boolean oldLockRequired = txnManager.isLockRequired();
boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired);
try {
this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
preCommit(metadata);
Expand All @@ -409,6 +417,10 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
} finally {
this.txnManager.endTransaction(Option.of(logCompactionInstant));
// Restore the old lock requirement if it was updated.
if (isTxnManagerLockRequirementUpdated) {
txnManager.setIsLockRequired(oldLockRequired);
}
releaseResources(logCompactionCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime)
Expand Down Expand Up @@ -528,6 +540,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
handleWriteErrors(writeStats, TableServiceType.CLUSTER);
final HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
table.getActiveTimeline(), table.getMetaClient().getInstantGenerator()).get();
boolean oldLockRequired = txnManager.isLockRequired();
boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired);
try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());

Expand All @@ -549,6 +563,10 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
} finally {
this.txnManager.endTransaction(Option.of(clusteringInstant));
// Restore the old lock requirement if it was updated.
if (isTxnManagerLockRequirementUpdated) {
txnManager.setIsLockRequired(oldLockRequired);
}
releaseResources(clusteringCommitTime);
}
WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
Expand Down Expand Up @@ -633,12 +651,18 @@ public Option<String> scheduleTableService(String instantTime, Option<Map<String
InstantGenerator instantGenerator = TimelineLayout.fromVersion(tableConfig.getTableVersion().getTimelineLayoutVersion()).getInstantGenerator();
final Option<HoodieInstant> inflightInstant = Option.of(instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
tableServiceType.getAction(), instantTime));
boolean oldLockRequired = txnManager.isLockRequired();
boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired);
try {
this.txnManager.beginTransaction(inflightInstant, Option.empty());
LOG.info("Scheduling table service {} for table {}", tableServiceType, config.getBasePath());
return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType);
} finally {
this.txnManager.endTransaction(inflightInstant);
// Restore the old lock requirement if it was updated.
if (isTxnManagerLockRequirementUpdated) {
txnManager.setIsLockRequired(oldLockRequired);
}
}
}

Expand Down Expand Up @@ -967,7 +991,7 @@ protected Boolean rollbackFailedWrites() {
}

protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback) {
rollbackFailedWrites(instantsToRollback, false, false);
rollbackFailedWrites(instantsToRollback, !config.isLockRequired(), false);
}

protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking, boolean skipVersionCheck) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
HoodieInstant inflightInstant = table.getMetaClient().createNewInstant(State.INFLIGHT, commitActionType, instantTime);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
boolean oldLockRequired = txnManager.isLockRequired();
boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired);
this.txnManager.beginTransaction(Option.of(inflightInstant),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
Expand All @@ -251,6 +253,10 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
// Restore the old lock requirement if it was updated
if (isTxnManagerLockRequirementUpdated) {
txnManager.setIsLockRequired(oldLockRequired);
}
releaseResources(instantTime);
}

Expand Down Expand Up @@ -994,6 +1000,17 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC
}
}

/**
* Schedules a new compaction instant.
*
* @param extraMetadata Extra Metadata to be stored
* @param shouldLock whether to lock for time generation
*/
public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata, boolean shouldLock) throws HoodieIOException {
String instantTime = createNewInstantTime(shouldLock);
return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
}

/**
* Schedules a new compaction instant.
* @param extraMetadata Extra Metadata to be stored
Expand Down Expand Up @@ -1308,11 +1325,17 @@ protected void doInitTable(WriteOperationType operationType, HoodieTableMetaClie
}

private void executeUsingTxnManager(Option<HoodieInstant> ownerInstant, Runnable r) {
boolean oldLockRequired = txnManager.isLockRequired();
boolean isTxnManagerLockRequirementUpdated = updateTxnManagerLockRequirementIfNecessary(oldLockRequired);
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
r.run();
} finally {
this.txnManager.endTransaction(ownerInstant);
// Restore the old lock requirement if it was updated
if (isTxnManagerLockRequirementUpdated) {
txnManager.setIsLockRequired(oldLockRequired);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TransactionManager implements Serializable {

protected static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class);
protected final LockManager lockManager;
protected final boolean isLockRequired;
protected boolean isLockRequired;
protected Option<HoodieInstant> currentTxnOwnerInstant = Option.empty();
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();

Expand Down Expand Up @@ -105,4 +105,12 @@ public Option<HoodieInstant> getCurrentTransactionOwner() {
public boolean isLockRequired() {
return isLockRequired;
}

/**
* Lock requirement for transaction manager can change from within a transaction.
* For example, when a compaction is scheduled during upgrade, the lock requirement is set to false.
*/
public void setIsLockRequired(boolean isLockRequired) {
this.isLockRequired = isLockRequired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,8 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) {
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build();
this.timeGeneratorConfig = HoodieTimeGeneratorConfig.newBuilder().fromProperties(props)
.withDefaultLockProvider(!isLockRequired()).build();
// Instant time generation needs a lock. If lock provider is not set, use default lock provider
.withDefaultLockProvider(!isLockProviderSet()).build();
this.indexingConfig = HoodieIndexingConfig.newBuilder().fromProperties(props).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContex
try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
writeClient.rollbackFailedWrites();
if (shouldCompact) {
Option<String> compactionInstantOpt = writeClient.scheduleCompaction(Option.empty());
Option<String> compactionInstantOpt = writeClient.scheduleCompaction(Option.empty(), writeClient.getConfig().isLockRequired());
if (compactionInstantOpt.isPresent()) {
writeClient.compact(compactionInstantOpt.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.config.HoodieLockConfig
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
import org.apache.spark.sql.SaveMode
Expand All @@ -43,6 +45,7 @@ class TestSevenToEightUpgrade extends RecordLevelIndexTestBase {
"hoodie.metadata.enable" -> "false",
// "OverwriteWithLatestAvroPayload" is used to trigger merge mode upgrade/downgrade.
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName,
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> classOf[InProcessLockProvider].getName,
DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name)

doWriteAndValidateDataAndRecordIndex(hudiOpts,
Expand Down
Loading