Skip to content

Commit

Permalink
Fix build sempahore deadlock (#15097)
Browse files Browse the repository at this point in the history
* Adding support to allow passing null server metrics while building segment. This is added for reingestion that should not impact the normal server metrics

* Releasing sempahore to prevent deadlock from happening while segments are reingested and built

* Fixing integration tests to catch the case of deadlock caused by not releasing the build semaphore
  • Loading branch information
9aman authored Feb 20, 2025
1 parent f9750e2 commit 1abc1de
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.pinot.integration.tests.realtime.utils.FailureInjectingRealtimeTableDataManager.MAX_NUMBER_OF_FAILURES;
import static org.apache.pinot.spi.stream.StreamConfigProperties.SEGMENT_COMPLETION_FSM_SCHEME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -162,7 +163,8 @@ public void setUp()
addTableConfig(tableConfig);
Thread.sleep(60000L);
TestUtils.waitForCondition(
(aVoid) -> atLeastOneErrorSegmentInExternalView(TableNameBuilder.REALTIME.tableNameWithType(getTableName())),
(aVoid) -> numberOfErrorSegmentInExternalView(TableNameBuilder.REALTIME.tableNameWithType(getTableName()))
== MAX_NUMBER_OF_FAILURES,
1000, 600000, "Segments still not in error state");
}

Expand Down Expand Up @@ -275,18 +277,19 @@ private void verifyIdealState(String tableName, int numSegmentsExpected) {
assertEquals(segmentAssignment.size(), numSegmentsExpected);
}

private boolean atLeastOneErrorSegmentInExternalView(String tableName) {
private int numberOfErrorSegmentInExternalView(String tableName) {
int errorSegmentCount = 0;
ExternalView resourceEV = _helixResourceManager.getHelixAdmin()
.getResourceExternalView(_helixResourceManager.getHelixClusterName(), tableName);
Map<String, Map<String, String>> segmentAssigment = resourceEV.getRecord().getMapFields();
for (Map<String, String> serverToStateMap : segmentAssigment.values()) {
for (String state : serverToStateMap.values()) {
if (state.equals("ERROR")) {
return true;
errorSegmentCount++;
}
}
}
return false;
return errorSegmentCount;
}

private void assertUploadUrlEmpty(List<SegmentZKMetadata> segmentZKMetadataList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.integration.tests.realtime.utils;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
Expand All @@ -36,7 +37,8 @@


public class FailureInjectingRealtimeTableDataManager extends RealtimeTableDataManager {
private volatile boolean _hasFailedOnce = false;
public static final int MAX_NUMBER_OF_FAILURES = 10;
private final AtomicInteger _numberOfFailures = new AtomicInteger(0);

public FailureInjectingRealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
this(segmentBuildSemaphore, () -> true);
Expand All @@ -55,13 +57,8 @@ protected RealtimeSegmentDataManager createRealtimeSegmentDataManager(SegmentZKM
throws AttemptsExceededException, RetriableOperationException {

boolean addFailureToCommits = PauselessConsumptionUtils.isPauselessEnabled(tableConfig);

if (addFailureToCommits) {
if (_hasFailedOnce) {
addFailureToCommits = false;
} else {
_hasFailedOnce = true;
}
if (addFailureToCommits && _numberOfFailures.getAndIncrement() >= MAX_NUMBER_OF_FAILURES) {
addFailureToCommits = false;
}
return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, addFailureToCommits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro
}
}

public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics)
public void build(@Nullable SegmentVersion segmentVersion, @Nullable ServerMetrics serverMetrics)
throws Exception {
SegmentGeneratorConfig genConfig = new SegmentGeneratorConfig(_tableConfig, _dataSchema, true);

Expand Down Expand Up @@ -116,7 +116,7 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
}
}

if (segmentPartitionConfig != null) {
if (segmentPartitionConfig != null && serverMetrics != null) {
Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
for (String columnName : columnPartitionMap.keySet()) {
int numPartitions = driver.getSegmentStats().getColumnProfileFor(columnName).getPartitions().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public File buildSegment() {
long startTimeMs = now();
try {
if (_segBuildSemaphore != null) {
_logger.info("Trying to acquire semaphore for building segment");
_logger.info("Trying to acquire semaphore for building segment: {}", _segmentName);
Instant acquireStart = Instant.now();
int timeoutSeconds = 5;
while (!_segBuildSemaphore.tryAcquire(timeoutSeconds, TimeUnit.SECONDS)) {
Expand All @@ -345,36 +345,44 @@ public File buildSegment() {
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for segment build semaphore", e);
}
long lockAcquireTimeMs = now();
_logger.info("Acquired lock for building segment in {} ms", lockAcquireTimeMs - startTimeMs);

// Build a segment from in-memory rows.
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset(_startOffset.toString());
segmentZKPropsConfig.setEndOffset(_endOffset.toString());

// Build the segment
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, segmentZKPropsConfig, _resourceTmpDir.getAbsolutePath(), _schema,
_tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(),
_tableConfig.getIndexingConfig().isNullHandlingEnabled());
try {
converter.build(null, null);
} catch (Exception e) {
throw new RuntimeException("Failed to build segment", e);
}
_logger.info("Successfully built segment (Column Mode: {}) in {} ms", converter.isColumnMajorEnabled(),
now() - lockAcquireTimeMs);
long lockAcquireTimeMs = now();
_logger.info("Acquired lock for building segment in {} ms", lockAcquireTimeMs - startTimeMs);

// Build a segment from in-memory rows.
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset(_startOffset.toString());
segmentZKPropsConfig.setEndOffset(_endOffset.toString());

// Build the segment
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, segmentZKPropsConfig, _resourceTmpDir.getAbsolutePath(),
_schema,
_tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(),
_tableConfig.getIndexingConfig().isNullHandlingEnabled());
try {
converter.build(null, null);
} catch (Exception e) {
throw new RuntimeException("Failed to build segment", e);
}
_logger.info("Successfully built segment (Column Mode: {}) in {} ms", converter.isColumnMajorEnabled(),
now() - lockAcquireTimeMs);

File indexDir = new File(_resourceTmpDir, _segmentName);
File segmentTarFile = new File(_resourceTmpDir, _segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
try {
TarCompressionUtils.createCompressedTarFile(new File(_resourceTmpDir, _segmentName), segmentTarFile);
} catch (Exception e) {
throw new RuntimeException(
"Caught exception while tarring index directory from: " + indexDir + " to: " + segmentTarFile, e);
File indexDir = new File(_resourceTmpDir, _segmentName);
File segmentTarFile = new File(_resourceTmpDir, _segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
try {
TarCompressionUtils.createCompressedTarFile(new File(_resourceTmpDir, _segmentName), segmentTarFile);
} catch (Exception e) {
throw new RuntimeException(
"Caught exception while tarring index directory from: " + indexDir + " to: " + segmentTarFile, e);
}
return segmentTarFile;
} finally {
if (_segBuildSemaphore != null) {
_logger.info("Releasing semaphore for building segment");
_segBuildSemaphore.release();
}
}
return segmentTarFile;
}

protected long now() {
Expand Down

0 comments on commit 1abc1de

Please sign in to comment.