Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crc mismatch during deepstore upload retry task #14506

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1555,26 +1555,38 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
segmentName));
}

// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
int iteration = 0;
// Round robin the servers until we find the one with the correct crc and successful upload
// If server is the last valid URI left then skip crc check as deepstore copy reliability takes a higher
// priority
for (URI uri: peerSegmentURIs) {
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d&expectedCrc=%d", serverUploadRequestUrl,
_deepstoreUploadRetryTimeoutMs,
(iteration == (peerSegmentURIs.size() - 1) ? -1 : segmentZKMetadata.getCrc()));
LOGGER.info("Ask server {} to upload LLC segment {} to deep store by this path: {}", uri, segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl;
try {
tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
} catch (Exception e) {
LOGGER.warn("Failed to upload LLC segment {} to deepstore from server {}", segmentName, uri);
iteration++;
continue;
}

String segmentDownloadUrl = moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
// Update segment ZK metadata by adding the download URL
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
}
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ public void testUploadToSegmentStore()
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0);
// mock the request/response for 1st segment upload
String serverUploadRequestUrl0 =
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance0, adminPort,
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1&expectedCrc=-1", instance0, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
// tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
// with a random UUID
Expand All @@ -998,7 +998,7 @@ public void testUploadToSegmentStore()
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance1)).thenReturn(instanceConfig1);
// mock the request/response for 2nd segment upload
String serverUploadRequestUrl1 =
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance1, adminPort,
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1&expectedCrc=-1", instance1, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl1)).thenThrow(
new HttpErrorStatusException("failed to upload segment",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ public String uploadLLCSegment(
String realtimeTableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName,
@QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs,
@QueryParam("expectedCrc") @DefaultValue("-1") long expectedCrc,
@Context HttpHeaders headers)
throws Exception {
realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName, headers);
Expand Down Expand Up @@ -811,6 +812,14 @@ public String uploadLLCSegment(
Response.Status.NOT_FOUND);
}

// check if expected crc is the same as this server, this is to ensure ZK metadata crc matches deepstore CRC
if (expectedCrc != -1
&& expectedCrc != Long.parseLong(segmentDataManager.getSegment().getSegmentMetadata().getCrc())) {
throw new WebApplicationException(
String.format("Table %s segment %s crc does not match the expected crc", realtimeTableName, segmentName),
Response.Status.NOT_FOUND);
}

File segmentTarFile = null;
try {
// Create the tar.gz segment file in the server's segmentTarUploadDir folder with a unique file name.
Expand Down
Loading