Skip to content

Commit

Permalink
[BugFix] Fix online optimize table fail due to physical partition id …
Browse files Browse the repository at this point in the history
…changed

Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Mar 3, 2025
1 parent 71e6df2 commit 391555a
Show file tree
Hide file tree
Showing 6 changed files with 829 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,27 @@ protected void runWaitingTxnJob() throws AlterCancelException {
LOG.info("transfer optimize job {} state to {}", jobId, this.jobState);
}

private void enableDoubleWritePartition(Database db, OlapTable tbl, String sourcePartitionName, String tmpPartitionName) {
private void enableDoubleWritePartition(Database db, OlapTable tbl, String sourcePartitionName, String tempPartitionName) {
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.WRITE);
try {
Preconditions.checkState(tbl.getState() == OlapTableState.OPTIMIZE);
tbl.addDoubleWritePartition(sourcePartitionName, tmpPartitionName);
LOG.info("job {} add double write partition {} to {}", jobId, tmpPartitionName, sourcePartitionName);
Partition temp = tbl.getPartition(tempPartitionName, true);
if (temp != null) {
Preconditions.checkState(temp.getSubPartitions().size() == 1);
Partition p = tbl.getPartition(sourcePartitionName);
if (p != null) {
Preconditions.checkState(p.getSubPartitions().size() == 1);
tbl.addDoubleWritePartition(p.getId(), temp.getId());

LOG.info("job {} add double write partition: {}:{} -> {}:{}", jobId, sourcePartitionName,
p.getId(), tempPartitionName, temp.getId());
} else {
LOG.warn("job {} add double partition {} does not exist", jobId, sourcePartitionName);
}
} else {
LOG.warn("job {} add double partition {} does not exist", jobId, tempPartitionName);
}
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
}
Expand Down
14 changes: 2 additions & 12 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,18 +434,8 @@ public void copyOnlyForQuery(OlapTable olapTable) {
olapTable.dbName = this.dbName;
}

public void addDoubleWritePartition(String sourcePartitionName, String tempPartitionName) {
Partition temp = tempPartitions.getPartition(tempPartitionName);
if (temp != null) {
Partition p = getPartition(sourcePartitionName);
if (p != null) {
doubleWritePartitions.put(p.getId(), temp.getId());
} else {
LOG.warn("partition {} does not exist", sourcePartitionName);
}
} else {
LOG.warn("partition {} does not exist", tempPartitionName);
}
public void addDoubleWritePartition(long sourcePartitionId, long tempPartitionId) {
doubleWritePartitions.put(sourcePartitionId, tempPartitionId);
}

public void clearDoubleWritePartition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ protected void unprotectedCommitPreparedTransaction(TransactionState transaction
PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
long partitionId = partitionCommitInfo.getPhysicalPartitionId();
PhysicalPartition partition = table.getPhysicalPartition(partitionId);
long parentPartitionId = partition.getParentId();
// partition maybe dropped between commit and publish version, ignore this error
if (partition == null) {
partitionCommitInfoIterator.remove();
Expand Down Expand Up @@ -1365,20 +1366,20 @@ protected void unprotectedCommitPreparedTransaction(TransactionState transaction
partitionCommitInfo.setVersionEpoch(partition.nextVersionEpoch());
}
} else {
// double write logic partition
Map<Long, Long> doubleWritePartitions = table.getDoubleWritePartitions();
if (doubleWritePartitions != null && !doubleWritePartitions.isEmpty()) {
// double write partition
if (doubleWritePartitions.containsValue(partitionId)) {
doubleWritePartitionCommitInfos.put(partitionId, partitionCommitInfo);
if (doubleWritePartitions.containsValue(parentPartitionId)) {
doubleWritePartitionCommitInfos.put(parentPartitionId, partitionCommitInfo);
} else {
// double write partition version is the same as the original partition
if (doubleWritePartitions.containsKey(partitionId)) {
doubleWritePartitionVersions.put(doubleWritePartitions.get(partitionId),
if (doubleWritePartitions.containsKey(parentPartitionId)) {
doubleWritePartitionVersions.put(doubleWritePartitions.get(parentPartitionId),
partition.getNextVersion());
}
partitionCommitInfo.setVersion(partition.getNextVersion());
LOG.info("set partition {} version to {} in transaction {}",
partitionId, partitionCommitInfo.getVersion(), transactionState);
LOG.info("set partition {}:{} version to {} in transaction {}",
parentPartitionId, partitionId, partitionCommitInfo.getVersion(), transactionState);
}
} else {
partitionCommitInfo.setVersion(partition.getNextVersion());
Expand Down Expand Up @@ -1406,8 +1407,9 @@ protected void unprotectedCommitPreparedTransaction(TransactionState transaction
if (partitionCommitInfo != null) {
partitionCommitInfo.setVersion(entry.getValue());
partitionCommitInfo.setIsDoubleWrite(true);
LOG.info("set double write partition {} version to {} in transaction {}",
entry.getKey(), entry.getValue(), transactionState);
LOG.info("set double write partition {}:{} version to {} in transaction {}",
entry.getKey(), table.getPartition(entry.getKey()).getDefaultPhysicalPartition().getId(),
entry.getValue(), transactionState);
}
}
}
Expand Down
Loading

0 comments on commit 391555a

Please sign in to comment.