Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Jan 10, 2025
1 parent a920885 commit 8bdbebc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ trait RecordChecksum extends DeltaLogging {
* @param deltaLog The DeltaLog
* @param versionToCompute The version for which we want to compute the checksum
* @param actions The actions corresponding to the version `versionToCompute`
* @param metadata The metadata corresponding to the version `versionToCompute`
* @param protocol The protocol corresponding to the version `versionToCompute`
* @param operationName The operation name corresponding to the version `versionToCompute`
* @param txnIdOpt The transaction identifier for the version `versionToCompute`
* @param previousVersionState Contains either the versionChecksum corresponding to
Expand All @@ -156,8 +154,6 @@ trait RecordChecksum extends DeltaLogging {
deltaLog: DeltaLog,
versionToCompute: Long,
actions: Seq[Action],
metadata: Metadata,
protocol: Protocol,
operationName: String,
txnIdOpt: Option[String],
previousVersionState: Either[Snapshot, VersionChecksum],
Expand Down Expand Up @@ -213,6 +209,10 @@ trait RecordChecksum extends DeltaLogging {
// Incrementally compute the new version checksum, if the old one is available.
val ignoreAddFilesInOperation =
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
val protocol =
actions.collectFirst { case p: Protocol => p }.getOrElse(oldVersionChecksum.protocol)
val metadata =
actions.collectFirst { case m: Metadata => m }.getOrElse(oldVersionChecksum.metadata)
val persistentDVsOnTableReadable =
DeletionVectorUtils.deletionVectorsReadable(protocol, metadata)
val persistentDVsOnTableWritable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2553,8 +2553,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite
deltaLog,
attemptVersion,
actions = currentTransactionInfo.finalActionsToCommit,
metadata = currentTransactionInfo.metadata,
protocol = currentTransactionInfo.protocol,
operationName = currentTransactionInfo.op.name,
txnIdOpt = Some(currentTransactionInfo.txnId),
previousVersionState = scala.Left(snapshot),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class Snapshot(
* checksum file. If the checksum file is not present or if the protocol or metadata is missing
* this will return None.
*/
protected def getProtocolMetadataAndIctFromCrc():
protected def getProtocolMetadataAndIctFromCrc(checksumOpt: Option[VersionChecksum]):
Option[Array[ReconstructedProtocolMetadataAndICT]] = {
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
Expand Down Expand Up @@ -431,7 +431,7 @@ class Snapshot(
Array[ReconstructedProtocolMetadataAndICT] = {
import implicits._

getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc =>
getProtocolMetadataAndIctFromCrc(checksumOpt).foreach { protocolMetadataAndIctFromCrc =>
return protocolMetadataAndIctFromCrc
}

Expand Down

0 comments on commit 8bdbebc

Please sign in to comment.