Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move id converter to router for put and ttl update handler logic #2983

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProper
}

@Override
public Future<String> stitchBlob(BlobProperties blobProperties, byte[] userMetadata, List<ChunkInfo> chunksToStitch,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback) {
public Future<String> stitchBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata,
List<ChunkInfo> chunksToStitch, PutBlobOptions options, Callback<String> callback,
QuotaChargeCallback quotaChargeCallback) {
throw new UnsupportedOperationException("stichBlob is not supported by this mock");
}

Expand Down
20 changes: 20 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/config/RouterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.github.ambry.utils.Utils;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -149,6 +150,7 @@ public class RouterConfig {
public static final String ROUTER_OPERATION_TRACKER_CHECK_ALL_ORIGINATING_REPLICAS_FOR_NOT_FOUND =
"router.operation.tracker.check.all.originating.replicas.for.not.found";
public static final String RESERVED_METADATA_ENABLED = "router.reserved.metadata.enabled";
public static final String CLUSTERMAP_CLUSTER_NAME = ClusterMapConfig.CLUSTERMAP_CLUSTER_NAME;
public static final String ROUTER_GET_OPERATION_DEPRIORITIZE_BOOTSTRAP_REPLICAS =
"router.get.operation.deprioritize.bootstrap.replicas";

Expand Down Expand Up @@ -781,15 +783,33 @@ public class RouterConfig {
*/
public final String namedBlobDbFactory;

/**
* This is set in frontendConfig until id converter been fully migrate to router.
*/
public final List<String> pathPrefixesToRemove;

/**
* This is set in clusterMapConfig.
*/
@Config(CLUSTERMAP_CLUSTER_NAME)
public String clusterName;

/**
* Create a RouterConfig instance.
* @param verifiableProperties the properties map to refer to.
*/
public RouterConfig(VerifiableProperties verifiableProperties) {
FrontendConfig frontendConfig = new FrontendConfig(verifiableProperties);
Properties clusterMapProperty = new Properties();
clusterMapProperty.setProperty("clustermap.cluster.name", "test");
clusterMapProperty.setProperty("clustermap.datacenter.name", "dcName");
clusterMapProperty.setProperty("clustermap.host.name", "localhost");
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(clusterMapProperty));
idConverterFactory = frontendConfig.idConverterFactory;
idSigningServiceFactory = frontendConfig.idSigningServiceFactory;
namedBlobDbFactory = frontendConfig.namedBlobDbFactory;
pathPrefixesToRemove = frontendConfig.pathPrefixesToRemove;
clusterName = clusterMapConfig.clusterMapClusterName;
routerBlobMetadataCacheId =
verifiableProperties.getString(ROUTER_BLOB_METADATA_CACHE_ID, "routerBlobMetadataCache");
routerMaxNumMetadataCacheEntries =
Expand Down
5 changes: 5 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,11 @@ public static final class InternalKeys {
* content-length header
*/
public static final String CONTENT_RANGE_LENGTH = KEY_PREFIX + "content-range-length";

/**
* Used to determine if the ttlUpdate need to go through id convert to get blobId from blobName;
*/
public static final String BLOB_ID = KEY_PREFIX + "blob-id";
}

/**
Expand Down
25 changes: 14 additions & 11 deletions ambry-api/src/main/java/com/github/ambry/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,21 @@ Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, b
* Requests for a new metadata blob to be put asynchronously and invokes the {@link Callback} when the request
* completes. This metadata blob will contain references to the chunks provided as an argument. The blob ID returned
* by this operation can be used to fetch the chunks as if they were a single blob.
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored. The
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param chunksToStitch the list of data chunks to stitch together. The router will treat the metadata in the
* {@link ChunkInfo} object as a source of truth, so the caller should ensure that these
* fields are set accurately.
* @param options the {@link PutBlobOptions}.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
*
* @param restRequest The {@link RestRequest} to stitch blob.
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored.
* The channel is consumed fully, and the size of the blob is the number of bytes read from
* it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param chunksToStitch the list of data chunks to stitch together. The router will treat the metadata in the
* {@link ChunkInfo} object as a source of truth, so the caller should ensure that these
* fields are set accurately.
* @param options the {@link PutBlobOptions}.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @param quotaChargeCallback Listener interface to charge quota cost for the operation.
* @return A future that would contain the BlobId eventually.
*/
Future<String> stitchBlob(BlobProperties blobProperties, byte[] userMetadata, List<ChunkInfo> chunksToStitch,
Future<String> stitchBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata, List<ChunkInfo> chunksToStitch,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback);

/**
Expand Down Expand Up @@ -163,7 +166,7 @@ default CompletableFuture<GetBlobResult> getBlob(String blobId, GetBlobOptions o
default CompletableFuture<String> stitchBlob(BlobProperties blobProperties, byte[] userMetadata,
List<ChunkInfo> chunksToStitch) {
CompletableFuture<String> future = new CompletableFuture<>();
stitchBlob(blobProperties, userMetadata, chunksToStitch, null, CallbackUtils.fromCompletableFuture(future), null);
stitchBlob(null, blobProperties, userMetadata, chunksToStitch, null, CallbackUtils.fromCompletableFuture(future), null);
return future;
}

Expand All @@ -183,7 +186,7 @@ default CompletableFuture<String> stitchBlob(BlobProperties blobProperties, byte
default CompletableFuture<String> stitchBlob(BlobProperties blobProperties, byte[] userMetadata,
List<ChunkInfo> chunksToStitch, PutBlobOptions options) {
CompletableFuture<String> future = new CompletableFuture<>();
stitchBlob(blobProperties, userMetadata, chunksToStitch, options, CallbackUtils.fromCompletableFuture(future),
stitchBlob(null, blobProperties, userMetadata, chunksToStitch, options, CallbackUtils.fromCompletableFuture(future),
null);
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ public void postGetHeadUpdateDeleteUndeleteTest() throws Exception {
}
}

@Ignore
@Test
public void datasetTest() throws Exception {
Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount();
Expand Down Expand Up @@ -300,7 +299,6 @@ public void datasetTest() throws Exception {
doListDatasetAndVerify(refAccount.getName(), namedBlobOptionalContainer.getName(), new ArrayList<>());
}

@Ignore
@Test
public void datasetOutOfRetentionTest() throws Exception {
Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount();
Expand Down Expand Up @@ -343,7 +341,6 @@ public void datasetOutOfRetentionTest() throws Exception {
assertEquals("Should only have 1 version", 1, datasetVersions.size());
}

@Ignore
@Test
public void datasetRenameTest() throws Exception {
Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount();
Expand All @@ -369,7 +366,6 @@ public void datasetRenameTest() throws Exception {
doListDatasetVersionAndVerify(datasetList, new ArrayList<>());
}

@Ignore
@Test
public void datasetTtlTest() throws Exception {
Account refAccount = ACCOUNT_SERVICE.createAndAddRandomAccount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -444,7 +445,7 @@ List<Dataset> doDatasetPutUpdateGetTest(Account account, Container container, Lo
versionSchemas.add(MONOTONIC);
versionSchemas.add(SEMANTIC_LONG);
for (Dataset.VersionSchema versionSchema : versionSchemas) {
String datasetName = "zzzz" + TestUtils.getRandomString(10);
String datasetName = "datasetName" + TestUtils.getRandomString(100);
Dataset dataset;
if (ttl == null) {
dataset = new DatasetBuilder(accountName, containerName, datasetName).setVersionSchema(versionSchema).build();
Expand All @@ -463,7 +464,7 @@ List<Dataset> doDatasetPutUpdateGetTest(Account account, Container container, Lo
getDatasetAndVerify(dataset, getHeaders);

//Update dataset
Dataset datasetToUpdate = new DatasetBuilder(dataset).setRetentionCount(10).build();
Dataset datasetToUpdate = new DatasetBuilder(dataset).setRetentionCount(50).build();
putDatasetAndVerify(datasetToUpdate, headers, true);
getDatasetAndVerify(datasetToUpdate, getHeaders);
datasetList.add(datasetToUpdate);
Expand Down Expand Up @@ -505,13 +506,17 @@ List<Pair<String, String>> doDatasetVersionPutGetWithTtlTest(Account account, Co
String accountName = account.getName();
List<Pair<String, String>> datasetVersions = new ArrayList<>();
for (Dataset dataset : datasets) {
Set<String> datasetVersionSet = new HashSet<>();
for (long ttl : new long[]{-1, TTL_SECS}) {
//Test put dataset version with default dataset level ttl
HttpHeaders headers = new DefaultHttpHeaders();
setAmbryHeadersForPut(headers, ttl, false, accountName, contentType, ownerId, null, null);
headers.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true);
ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes(contentSize));
String version = generateDatasetVersion(dataset);
String version;
do {
version = generateDatasetVersion(dataset);
} while (!datasetVersionSet.add(version));
if (dataset.getRetentionTimeInSeconds() == null) {
putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, ttl);
datasetVersions.add(new Pair<>(dataset.getDatasetName(), version));
Expand All @@ -526,7 +531,9 @@ List<Pair<String, String>> doDatasetVersionPutGetWithTtlTest(Account account, Co
setAmbryHeadersForPut(headers, ttl, false, accountName, contentType, ownerId, null, null);
headers.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true);
headers.add(RestUtils.Headers.DATASET_VERSION_TTL_ENABLED, true);
version = generateDatasetVersion(dataset);
do {
version = generateDatasetVersion(dataset);
} while (!datasetVersionSet.add(version));
putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, ttl);
datasetVersions.add(new Pair<>(dataset.getDatasetName(), version));
}
Expand Down Expand Up @@ -617,14 +624,18 @@ List<Pair<String, String>> doDatasetVersionPutGetTest(Account account, Container
String containerName = container.getName();
List<Pair<String, String>> datasetVersions = new ArrayList<>();
for (Dataset dataset : datasets) {
Set<String> datasetVersionSet = new HashSet<>();
for (long ttl : new long[]{-1, TTL_SECS}) {
//Test put dataset version
HttpHeaders headers = new DefaultHttpHeaders();
setAmbryHeadersForPut(headers, ttl, false, accountName, contentType, ownerId, null, null);
headers.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true);
int contentSize = 100;
ByteBuffer content = ByteBuffer.wrap(TestUtils.getRandomBytes(contentSize));
String version = generateDatasetVersion(dataset);
String version;
do {
version = generateDatasetVersion(dataset);
} while (!datasetVersionSet.add(version));
String blobId = putDatasetVersionAndVerify(dataset, version, headers, content, contentSize, ttl);

// This is the blob id for the given blob name, we should be able to do all get operations on this blob id.
Expand Down Expand Up @@ -657,14 +668,18 @@ List<Pair<String, String>> doStitchDatasetVersionGetTest(Account account, Contai
String containerName = container.getName();
List<Pair<String, String>> datasetVersions = new ArrayList<>();
for (Dataset dataset : datasets) {
Set<String> datasetVersionSet = new HashSet<>();
for (long ttl : new long[]{-1, TTL_SECS}) {
//Test stitch
HttpHeaders stitchHeaders = new DefaultHttpHeaders();
setAmbryHeadersForPut(stitchHeaders, ttl, !container.isCacheable(), "stitcher", contentType, ownerId,
account.getName(), container.getName());
stitchHeaders.add(RestUtils.Headers.DATASET_VERSION_QUERY_ENABLED, true);
stitchHeaders.add(RestUtils.Headers.UPLOAD_NAMED_BLOB_MODE, "STITCH");
String version = generateDatasetVersion(dataset);
String version;
do {
version = generateDatasetVersion(dataset);
} while (!datasetVersionSet.add(version));
String stitchedBlobId =
doDatasetStitchAndVerify(account, container, dataset, version, stitchHeaders, signedChunkIds,
stitchedBlobSize, ttl);
Expand Down Expand Up @@ -959,15 +974,15 @@ private String generateDatasetVersion(Dataset dataset) {
} else if (MONOTONIC.equals(datasetVersionSchema)) {
version = String.valueOf(random.nextInt(10000));
} else if (SEMANTIC.equals(datasetVersionSchema)) {
int major = random.nextInt(100);
int minor = random.nextInt(100);
int patch = random.nextInt(100);
int major = random.nextInt(1000);
int minor = random.nextInt(1000);
int patch = random.nextInt(1000);
version = major + "." + minor + "." + patch;
} else if (SEMANTIC_LONG.equals(datasetVersionSchema)) {
int major = random.nextInt(100);
int minor = random.nextInt(100);
int patch = random.nextInt(100);
int revision = random.nextInt(100);
int major = random.nextInt(1000);
int minor = random.nextInt(1000);
int patch = random.nextInt(1000);
int revision = random.nextInt(1000);
version = major + "." + minor + "." + patch + "." + revision;
} else {
throw new IllegalArgumentException("This type of version schema is not compatible");
Expand All @@ -980,7 +995,7 @@ private String generateDatasetVersion(Dataset dataset) {
*/
private void sleep() {
try {
Thread.sleep(1);
Thread.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException("Sleep was interrupted", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,15 @@ public void injectAccountAndContainerForDatasetRequest(RestRequest restRequest)
public void injectAccountContainerForNamedBlob(RestRequest restRequest, RestRequestMetricsGroup metricsGroup)
throws RestServiceException {
accountAndContainerSanityCheck(restRequest);

NamedBlobPath namedBlobPath = NamedBlobPath.parse(getRequestPath(restRequest), restRequest.getArgs());
NamedBlobPath namedBlobPath;
//For ttl update, the named blob path is defined in RestUtils.Headers.BLOB_ID
String blobIdStr = RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.BLOB_ID, false);
if (blobIdStr != null) {
blobIdStr = RestUtils.stripSlashAndExtensionFromId(blobIdStr);
namedBlobPath = NamedBlobPath.parse(blobIdStr, restRequest.getArgs());
} else {
namedBlobPath = NamedBlobPath.parse(getRequestPath(restRequest), restRequest.getArgs());
}
String accountName = namedBlobPath.getAccountName();
Account targetAccount = accountService.getAccountByName(accountName);
if (targetAccount == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
private Callback<Long> fetchStitchRequestBodyCallback(RetainingAsyncWritableChannel channel, BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putReadStitchRequestMetrics, bytesRead -> {
BlobProperties propertiesForRouterUpload = getPropertiesForRouterUpload(blobInfo);
router.stitchBlob(propertiesForRouterUpload, blobInfo.getUserMetadata(),
router.stitchBlob(restRequest, propertiesForRouterUpload, blobInfo.getUserMetadata(),
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)), null,
routerStitchBlobCallback(blobInfo, propertiesForRouterUpload),
routerStitchBlobCallback(blobInfo),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}, uri, LOGGER, deleteDatasetCallback);
}
Expand All @@ -293,34 +293,20 @@ private Callback<Long> fetchStitchRequestBodyCallback(RetainingAsyncWritableChan
* After {@link Router#putBlob} finishes, call {@link IdConverter#convert} to convert the returned ID into a format
* that will be returned in the "Location" header.
* @param blobInfo the {@link BlobInfo} to use for security checks.
* @param propertiesPassedInRouterUpload the {@link BlobProperties} instance that is passed to Router during upload
* @return a {@link Callback} to be used with {@link Router#putBlob}.
*/
private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo,
BlobProperties propertiesPassedInRouterUpload) {
private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putRouterStitchBlobMetrics, blobId -> {
// The actual blob size is now present in the instance of BlobProperties passed to the router.stitchBlob().
// Update it in the BlobInfo so that IdConverter can add it to the named blob DB
blobInfo.getBlobProperties().setBlobSize(propertiesPassedInRouterUpload.getBlobSize());
idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId));
}, uri, LOGGER, deleteDatasetCallback);
}

/**
* After {@link IdConverter#convert} finishes, call {@link SecurityService#postProcessRequest} to perform
* request time security checks that rely on the request being fully parsed and any additional arguments set.
* @param blobInfo the {@link BlobInfo} to use for security checks.
* @return a {@link Callback} to be used with {@link IdConverter#convert}.
*/
private Callback<String> idConverterCallback(BlobInfo blobInfo, String blobId) {
return buildCallback(frontendMetrics.putIdConversionMetrics, convertedBlobId -> {
restResponseChannel.setHeader(RestUtils.Headers.LOCATION, convertedBlobId);
restResponseChannel.setHeader(RestUtils.Headers.LOCATION, blobId);
String blobIdClean = stripPrefixAndExtension(blobId);
if (blobInfo.getBlobProperties().getTimeToLiveInSeconds() == Utils.Infinite_Time) {
// Do ttl update with retryExecutor. Use the blob ID returned from the router instead of the converted ID
// since the converted ID may be changed by the ID converter.
String serviceId = blobInfo.getBlobProperties().getServiceId();
retryExecutor.runWithRetries(retryPolicy,
callback -> router.updateBlobTtl(null, blobId, serviceId, Utils.Infinite_Time, callback,
callback -> router.updateBlobTtl(restRequest, blobIdClean, serviceId, Utils.Infinite_Time, callback,
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false)), this::isRetriable,
routerTtlUpdateCallbackForStitch(blobInfo, blobId));
} else {
Expand Down
Loading
Loading