Skip to content

Commit

Permalink
Upload v2 API implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Nov 27, 2024
1 parent 271c305 commit dc02f8c
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonProperty;


public class TableSegmentUploadV2Response {
private final String _segmentName;
private final String _segmentCrc;
private final String _downloadUrl;

public TableSegmentUploadV2Response(@JsonProperty("segmentName") String segmentName,
@JsonProperty("segmentCrc") String crc, @JsonProperty("downloadUrl") String downloadUrl) {
_segmentName = segmentName;
_segmentCrc = crc;
_downloadUrl = downloadUrl;
}

public String getSegmentName() {
return _segmentName;
}

public String getSegmentCrc() {
return _segmentCrc;
}

public String getDownloadUrl() {
return _downloadUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.TableSegmentUploadV2Response;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.common.utils.http.HttpClientConfig;
import org.apache.pinot.spi.auth.AuthProvider;
Expand Down Expand Up @@ -963,6 +964,32 @@ public String uploadToSegmentStore(String uri)
return downloadUrl;
}

/**
* Used by controllers to send requests to servers: Controller periodic task uses this endpoint to ask servers
* to upload committed llc segment to segment store if missing.
* @param uri The uri to ask servers to upload segment to segment store
* @return {@link TableSegmentUploadV2Response} - segment download url, crc, other metadata
* @throws URISyntaxException
* @throws IOException
* @throws HttpErrorStatusException
*/
public TableSegmentUploadV2Response uploadToSegmentStoreV2(String uri)
throws URISyntaxException, IOException, HttpErrorStatusException {
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(new URI(uri)).setVersion(HttpVersion.HTTP_1_1);
// sendRequest checks the response status code
SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
_httpClient.sendRequest(requestBuilder.build(), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
TableSegmentUploadV2Response tableSegmentUploadV2Response = JsonUtils.stringToObject(response.getResponse(),
TableSegmentUploadV2Response.class);
if (tableSegmentUploadV2Response.getDownloadUrl() == null
|| tableSegmentUploadV2Response.getDownloadUrl().isEmpty()) {
throw new HttpErrorStatusException(
String.format("Returned segment download url is empty after requesting servers to upload by the path: %s",
uri), response.getStatusCode());
}
return tableSegmentUploadV2Response;
}

/**
* Send segment uri.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.restlet.resources.TableSegmentUploadV2Response;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
Expand All @@ -82,7 +83,6 @@
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
Expand Down Expand Up @@ -1558,27 +1558,41 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String crcFromServer = getSegmentCrcFromServer(realtimeTableName, segmentName, uri.toString());
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (Long.parseLong(crcFromServer) != segmentZKMetadata.getCrc()) {
segmentZKMetadata.setCrc(Long.parseLong(crcFromServer));
try {
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "uploadV2");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
TableSegmentUploadV2Response tableSegmentUploadV2Response
= _fileUploadDownloadClient.uploadToSegmentStoreV2(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tableSegmentUploadV2Response.getDownloadUrl(), pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (Long.parseLong(tableSegmentUploadV2Response.getSegmentCrc()) != segmentZKMetadata.getCrc()) {
segmentZKMetadata.setCrc(Long.parseLong(tableSegmentUploadV2Response.getSegmentCrc()));
}
} catch (Exception e) {
// this is a fallback call for backward compatibility to the original API /upload in pinot-server
// should be deprecated in the long run
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl = moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
}
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
segmentZKMetadata.getDownloadUrl());
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
Expand All @@ -1599,16 +1613,6 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}
}

@VisibleForTesting
String getSegmentCrcFromServer(String tableNameWithType, String segmentName, String endpoint) {
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
String crcFromServer = serverSegmentMetadataReader.getCrcForSegmentFromServer(tableNameWithType,
segmentName, endpoint);
Preconditions.checkState(crcFromServer != null,
"Failed to get CRC from endpoint %s for segment %s", endpoint, segmentName);
return crcFromServer;
}

@VisibleForTesting
boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {
return _deepStoreUploadExecutorPendingSegments.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,24 +397,6 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName
return response;
}

/**
* Returns the crc value of the segment hosted on the server.
*/
@Nullable
public String getCrcForSegmentFromServer(String tableNameWithType, String segmentName, String endpoint) {
try {
// build the url
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
String url = String.format("%s/segments/%s/%s/crc", endpoint, tableNameWithType, segmentName);
ClientConfig clientConfig = new ClientConfig();
return ClientBuilder.newClient(clientConfig).target(url).request(MediaType.APPLICATION_JSON).get(String.class);
} catch (Exception e) {
LOGGER.error("Error in fetching crc from server {} for segment {}: {}", endpoint, segmentName, e.getMessage());
}
return null;
}

private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
Expand Down
Loading

0 comments on commit dc02f8c

Please sign in to comment.