From 8bdbebccc2d59270b71d6551659ce7a4004bf2ff Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Fri, 10 Jan 2025 11:08:29 -0800 Subject: [PATCH] Parallel calls --- .../main/scala/org/apache/spark/sql/delta/Checksum.scala | 8 ++++---- .../apache/spark/sql/delta/OptimisticTransaction.scala | 2 -- .../main/scala/org/apache/spark/sql/delta/Snapshot.scala | 4 ++-- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 867fc2f038a..438d86cd950 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -139,8 +139,6 @@ trait RecordChecksum extends DeltaLogging { * @param deltaLog The DeltaLog * @param versionToCompute The version for which we want to compute the checksum * @param actions The actions corresponding to the version `versionToCompute` - * @param metadata The metadata corresponding to the version `versionToCompute` - * @param protocol The protocol corresponding to the version `versionToCompute` * @param operationName The operation name corresponding to the version `versionToCompute` * @param txnIdOpt The transaction identifier for the version `versionToCompute` * @param previousVersionState Contains either the versionChecksum corresponding to @@ -156,8 +154,6 @@ trait RecordChecksum extends DeltaLogging { deltaLog: DeltaLog, versionToCompute: Long, actions: Seq[Action], - metadata: Metadata, - protocol: Protocol, operationName: String, txnIdOpt: Option[String], previousVersionState: Either[Snapshot, VersionChecksum], @@ -213,6 +209,10 @@ trait RecordChecksum extends DeltaLogging { // Incrementally compute the new version checksum, if the old one is available. val ignoreAddFilesInOperation = RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName) + val protocol = + actions.collectFirst { case p: Protocol => p }.getOrElse(oldVersionChecksum.protocol) + val metadata = + actions.collectFirst { case m: Metadata => m }.getOrElse(oldVersionChecksum.metadata) val persistentDVsOnTableReadable = DeletionVectorUtils.deletionVectorsReadable(protocol, metadata) val persistentDVsOnTableWritable = diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 338cb7e42c3..b374206b6f8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -2553,8 +2553,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite deltaLog, attemptVersion, actions = currentTransactionInfo.finalActionsToCommit, - metadata = currentTransactionInfo.metadata, - protocol = currentTransactionInfo.protocol, operationName = currentTransactionInfo.op.name, txnIdOpt = Some(currentTransactionInfo.txnId), previousVersionState = scala.Left(snapshot), diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 930e1eed499..3a1bb5f3f19 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -384,7 +384,7 @@ class Snapshot( * checksum file. If the checksum file is not present or if the protocol or metadata is missing * this will return None. */ - protected def getProtocolMetadataAndIctFromCrc(): + protected def getProtocolMetadataAndIctFromCrc(checksumOpt: Option[VersionChecksum]): Option[Array[ReconstructedProtocolMetadataAndICT]] = { if (!spark.sessionState.conf.getConf( DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) { @@ -431,7 +431,7 @@ class Snapshot( Array[ReconstructedProtocolMetadataAndICT] = { import implicits._ - getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc => + getProtocolMetadataAndIctFromCrc(checksumOpt).foreach { protocolMetadataAndIctFromCrc => return protocolMetadataAndIctFromCrc }