diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java index f601e9a3e6..16bd843941 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/VcrReplicationManager.java @@ -368,7 +368,7 @@ public VcrMetrics getVcrMetrics() { } /** For testing only */ - CloudStorageCompactor getCloudStorageCompactor() { + public CloudStorageCompactor getCloudStorageCompactor() { return cloudStorageCompactor; } diff --git a/ambry-vcr/src/main/java/com/github/ambry/vcr/AmbryCloudRequests.java b/ambry-vcr/src/main/java/com/github/ambry/vcr/AmbryCloudRequests.java new file mode 100644 index 0000000000..b432b2902b --- /dev/null +++ b/ambry-vcr/src/main/java/com/github/ambry/vcr/AmbryCloudRequests.java @@ -0,0 +1,418 @@ +/** + * Copyright 2022 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.github.ambry.vcr; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.cloud.CloudBlobStore; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.ErrorMapping; +import com.github.ambry.messageformat.MessageFormatException; +import com.github.ambry.messageformat.MessageFormatSend; +import com.github.ambry.messageformat.MessageFormatWriteSet; +import com.github.ambry.network.LocalRequestResponseChannel; +import com.github.ambry.network.NetworkRequest; +import com.github.ambry.network.RequestResponseChannel; +import com.github.ambry.protocol.DeleteRequest; +import com.github.ambry.protocol.DeleteResponse; +import com.github.ambry.protocol.GetRequest; +import com.github.ambry.protocol.GetResponse; +import com.github.ambry.protocol.PartitionRequestInfo; +import com.github.ambry.protocol.PartitionResponseInfo; +import com.github.ambry.protocol.PutRequest; +import com.github.ambry.protocol.PutResponse; +import com.github.ambry.protocol.RequestOrResponseType; +import com.github.ambry.protocol.TtlUpdateRequest; +import com.github.ambry.protocol.TtlUpdateResponse; +import com.github.ambry.protocol.UndeleteRequest; +import com.github.ambry.protocol.UndeleteResponse; +import com.github.ambry.server.AmbryRequests; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.IdUndeletedStoreException; +import com.github.ambry.store.MessageInfo; +import com.github.ambry.store.StoreErrorCodes; +import com.github.ambry.store.StoreException; +import com.github.ambry.store.StoreGetOptions; +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBufInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The request implementation class for handling requests to Cloud store. All requests from cloud router will be handled + * by this class. + */ +public class AmbryCloudRequests extends AmbryRequests { + + private static final Logger logger = LoggerFactory.getLogger(AmbryCloudRequests.class); + + /** + * @param storeManager {@link StoreManager} object to get stores for replicas + * @param requestResponseChannel the {@link RequestResponseChannel} to receive requests and send responses. + * @param clusterMap the {@link ClusterMap} of the cluster + * @param nodeId the data node Id. + * @param registry the {@link MetricRegistry} + */ + public AmbryCloudRequests(StoreManager storeManager, RequestResponseChannel requestResponseChannel, + ClusterMap clusterMap, DataNodeId nodeId, MetricRegistry registry) { + super(storeManager, requestResponseChannel, clusterMap, nodeId, registry, null, null, null, null, null, null); + } + + @Override + public void handlePutRequest(NetworkRequest request) throws InterruptedException { + + if (!(request instanceof LocalRequestResponseChannel.LocalChannelRequest)) { + throw new IllegalArgumentException("The request must be of LocalChannelRequest type"); + } + + PutRequest receivedRequest; + + // This is a case where handlePutRequest is called when frontends are writing to Azure. In this case, this method + // is called by request handler threads running within the frontend router itself. So, the request can be directly + // referenced as java objects without any need for deserialization. + PutRequest sentRequest = + (PutRequest) ((LocalRequestResponseChannel.LocalChannelRequest) request).getRequestInfo().getRequest(); + + // However, we will create a new PutRequest object to represent the received Put request since the blob content + // 'buffer' in PutRequest is accessed as 'stream' while writing to Store. Also, crc value for this request + // would be null since it is only calculated (on the fly) when sending the request to network. It might be okay to + // use null crc here since the scenario for which we are using crc (i.e. possibility of collisions due to fast + // replication) as described in this PR https://github.com/linkedin/ambry/pull/549 might not be applicable when + // frontends are talking to Azure. + receivedRequest = new PutRequest(sentRequest.getCorrelationId(), sentRequest.getClientId(), sentRequest.getBlobId(), + sentRequest.getBlobProperties(), sentRequest.getUsermetadata(), sentRequest.getBlobSize(), + sentRequest.getBlobType(), sentRequest.getBlobEncryptionKey(), new ByteBufInputStream(sentRequest.getBlob()), + null); + + AtomicReference response = new AtomicReference<>(); + try { + ServerErrorCode error = + validateRequest(receivedRequest.getBlobId().getPartition(), RequestOrResponseType.PutRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating put request failed with error {} for request {}", error, receivedRequest); + requestResponseChannel.sendResponse( + new PutResponse(receivedRequest.getCorrelationId(), receivedRequest.getClientId(), error), request, null); + } else { + MessageFormatWriteSet writeSet = getMessageFormatWriteSet(receivedRequest); + CloudBlobStore cloudBlobStore = + (CloudBlobStore) storeManager.getStore(receivedRequest.getBlobId().getPartition()); + //TODO: Pass executor to have async completion stages run on configured thread pool instead of letting them + // run on default ForkJoinPool + cloudBlobStore.putAsync(writeSet).whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + Exception ex = Utils.extractFutureExceptionCause(throwable); + if (ex instanceof StoreException) { + StoreErrorCodes storeErrorCode = ((StoreException) ex).getErrorCode(); + logger.error("Store exception on a put with error code {} for request {}", storeErrorCode, + receivedRequest, ex); + response.set(new PutResponse(receivedRequest.getCorrelationId(), receivedRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(storeErrorCode))); + } else { + logger.error("Unknown exception on a put for request {}", receivedRequest, ex); + response.set(new PutResponse(receivedRequest.getCorrelationId(), receivedRequest.getClientId(), + ServerErrorCode.Unknown_Error)); + } + } else { + response.set(new PutResponse(receivedRequest.getCorrelationId(), receivedRequest.getClientId(), + ServerErrorCode.No_Error)); + } + + try { + requestResponseChannel.sendResponse(response.get(), request, null); + } catch (InterruptedException ie) { + logger.warn("Interrupted while enqueuing the response", ie); + } + }); + } + } catch (MessageFormatException | IOException e) { + logger.error("Unknown exception on a put for request {}", receivedRequest, e); + requestResponseChannel.sendResponse( + new PutResponse(receivedRequest.getCorrelationId(), receivedRequest.getClientId(), + ServerErrorCode.Unknown_Error), request, null); + } + } + + @Override + public void handleGetRequest(NetworkRequest request) throws InterruptedException { + + if (!(request instanceof LocalRequestResponseChannel.LocalChannelRequest)) { + throw new IllegalArgumentException("The request must be of LocalChannelRequest type"); + } + + GetRequest getRequest; + // This is a case where handleGetRequest is called when frontends are reading from Azure. In this case, this method + // is called by request handler threads running within the frontend router itself. So, the request can be directly + // referenced as java objects without any need for deserialization. + getRequest = (GetRequest) ((LocalRequestResponseChannel.LocalChannelRequest) request).getRequestInfo().getRequest(); + + AtomicReference response = new AtomicReference<>(); + if (getRequest.getPartitionInfoList().size() > 1) { + logger.error("Invalid number of messages in GET request received from Frontend {}", getRequest); + response.set( + new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), ServerErrorCode.Bad_Request)); + } + PartitionRequestInfo partitionRequestInfo = getRequest.getPartitionInfoList().iterator().next(); + ServerErrorCode error = + validateRequest(partitionRequestInfo.getPartition(), RequestOrResponseType.GetRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating get request failed for partition {} with error {}", partitionRequestInfo.getPartition(), + error); + // Send Response + requestResponseChannel.sendResponse( + new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), error), request, null); + } else { + CloudBlobStore cloudBlobStore = (CloudBlobStore) storeManager.getStore(partitionRequestInfo.getPartition()); + EnumSet storeGetOptions = getStoreGetOptions(getRequest); + cloudBlobStore.getAsync(partitionRequestInfo.getBlobIds(), storeGetOptions) + .whenCompleteAsync((info, throwable) -> { + if (throwable != null) { + Exception ex = Utils.extractFutureExceptionCause(throwable); + if (ex instanceof StoreException) { + logger.error("Store exception on a get with error code {} for partition {}", + ((StoreException) ex).getErrorCode(), partitionRequestInfo.getPartition(), ex); + response.set(new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(((StoreException) ex).getErrorCode()))); + } else if (ex instanceof MessageFormatException) { + logger.error("Message format exception on a get with error code {} for partitionRequestInfo {}", + ((MessageFormatException) ex).getErrorCode(), partitionRequestInfo, ex); + response.set(new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), + ErrorMapping.getMessageFormatErrorMapping(((MessageFormatException) ex).getErrorCode()))); + } else { + logger.error("Unknown exception for request {}", getRequest, ex); + response.set(new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), + ServerErrorCode.Unknown_Error)); + } + } else { + MessageFormatSend blobsToSend; + try { + blobsToSend = new MessageFormatSend(info.getMessageReadSet(), getRequest.getMessageFormatFlag(), + messageFormatMetrics, storeKeyFactory); + PartitionResponseInfo partitionResponseInfo = + new PartitionResponseInfo(partitionRequestInfo.getPartition(), info.getMessageReadSetInfo(), + blobsToSend.getMessageMetadataList()); + response.set(new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), + Collections.singletonList(partitionResponseInfo), blobsToSend, ServerErrorCode.No_Error)); + } catch (IOException e) { + response.set(new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), + ServerErrorCode.Unknown_Error)); + } catch (MessageFormatException ex) { + response.set(new GetResponse(getRequest.getCorrelationId(), getRequest.getClientId(), + ErrorMapping.getMessageFormatErrorMapping(ex.getErrorCode()))); + } + } + try { + // Send Response + requestResponseChannel.sendResponse(response.get(), request, null); + } catch (InterruptedException ie) { + logger.warn("Interrupted while enqueuing the response", ie); + } + }); + } + } + + @Override + public void handleDeleteRequest(NetworkRequest request) throws InterruptedException { + + if (!(request instanceof LocalRequestResponseChannel.LocalChannelRequest)) { + throw new IllegalArgumentException("The request must be of LocalChannelRequest type"); + } + + // This is a case where handleDeleteRequest is called when frontends are talking to Azure. In this case, this method + // is called by request handler threads running within the frontend router itself. So, the request can be directly + // referenced as java objects without any need for deserialization. + DeleteRequest deleteRequest = + (DeleteRequest) ((LocalRequestResponseChannel.LocalChannelRequest) request).getRequestInfo().getRequest(); + + AtomicReference response = new AtomicReference<>(); + + ServerErrorCode error = + validateRequest(deleteRequest.getBlobId().getPartition(), RequestOrResponseType.DeleteRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating delete request failed with error {} for request {}", error, deleteRequest); + // Send Response + requestResponseChannel.sendResponse( + new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), error), request, null); + } else { + BlobId blobId = deleteRequest.getBlobId(); + MessageInfo info = + new MessageInfo.Builder(deleteRequest.getBlobId(), -1, blobId.getAccountId(), blobId.getContainerId(), + deleteRequest.getDeletionTimeInMs()).isDeleted(true) + .lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND) + .build(); + CloudBlobStore cloudBlobStore = (CloudBlobStore) storeManager.getStore(deleteRequest.getBlobId().getPartition()); + cloudBlobStore.deleteAsync(Collections.singletonList(info)).whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + Exception ex = Utils.extractFutureExceptionCause(throwable); + if (ex instanceof StoreException) { + logger.error("Store exception on a delete with error code {} for request {}", + ((StoreException) ex).getErrorCode(), deleteRequest, ex); + response.set(new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(((StoreException) ex).getErrorCode()))); + } else { + logger.error("Unknown exception for delete request {}", deleteRequest, ex); + response.set(new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + ServerErrorCode.Unknown_Error)); + } + } else { + response.set(new DeleteResponse(deleteRequest.getCorrelationId(), deleteRequest.getClientId(), + ServerErrorCode.No_Error)); + } + try { + requestResponseChannel.sendResponse(response.get(), request, null); + } catch (InterruptedException ie) { + logger.warn("Interrupted while enqueuing the response", ie); + } + }); + } + } + + @Override + public void handlePurgeRequest(NetworkRequest request) { + throw new UnsupportedOperationException("Purge is not supported in cloud yet."); + } + + @Override + public void handleTtlUpdateRequest(NetworkRequest request) throws InterruptedException { + + if (!(request instanceof LocalRequestResponseChannel.LocalChannelRequest)) { + throw new IllegalArgumentException("The request must be of LocalChannelRequest type"); + } + + TtlUpdateRequest updateRequest; + + // This is a case where handleTtlUpdateRequest is called when frontends are talking to Azure. In this case, this method + // is called by request handler threads running within the frontend router itself. So, the request can be directly + // referenced as java objects without any need for deserialization. + updateRequest = + (TtlUpdateRequest) ((LocalRequestResponseChannel.LocalChannelRequest) request).getRequestInfo().getRequest(); + + AtomicReference response = new AtomicReference<>(); + + ServerErrorCode error = + validateRequest(updateRequest.getBlobId().getPartition(), RequestOrResponseType.TtlUpdateRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating TtlUpdateRequest failed with error {} for request {}", error, updateRequest); + // Send Response + requestResponseChannel.sendResponse( + new TtlUpdateResponse(updateRequest.getCorrelationId(), updateRequest.getClientId(), error), request, null); + } else { + BlobId blobId = updateRequest.getBlobId(); + MessageInfo info = new MessageInfo.Builder(blobId, -1, blobId.getAccountId(), blobId.getContainerId(), + updateRequest.getOperationTimeInMs()).isTtlUpdated(true) + .expirationTimeInMs(updateRequest.getExpiresAtMs()) + .lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND) + .build(); + CloudBlobStore cloudBlobStore = (CloudBlobStore) storeManager.getStore(updateRequest.getBlobId().getPartition()); + cloudBlobStore.updateTtlAsync(Collections.singletonList(info)).whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + Exception ex = Utils.extractFutureExceptionCause(throwable); + if (ex instanceof StoreException) { + logger.error("Store exception on a TTL update with error code {} for request {}", + ((StoreException) ex).getErrorCode(), updateRequest, ex); + response.set(new TtlUpdateResponse(updateRequest.getCorrelationId(), updateRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(((StoreException) ex).getErrorCode()))); + } else { + logger.error("Unknown exception for TTL update request{}", updateRequest, ex); + response.set(new TtlUpdateResponse(updateRequest.getCorrelationId(), updateRequest.getClientId(), + ServerErrorCode.Unknown_Error)); + } + } else { + response.set(new TtlUpdateResponse(updateRequest.getCorrelationId(), updateRequest.getClientId(), + ServerErrorCode.No_Error)); + } + try { + requestResponseChannel.sendResponse(response.get(), request, null); + } catch (InterruptedException ie) { + logger.warn("Interrupted while enqueuing the response", ie); + } + }); + } + } + + @Override + public void handleUndeleteRequest(NetworkRequest request) { + + if (!(request instanceof LocalRequestResponseChannel.LocalChannelRequest)) { + throw new IllegalArgumentException("The request must be of LocalChannelRequest type"); + } + + UndeleteRequest undeleteRequest; + + // This is a case where handleUndeleteRequest is called when frontends are talking to Azure. In this case, this method + // is called by request handler threads running within the frontend router itself. So, the request can be directly + // referenced as java objects without any need for deserialization. + undeleteRequest = + (UndeleteRequest) ((LocalRequestResponseChannel.LocalChannelRequest) request).getRequestInfo().getRequest(); + + AtomicReference response = new AtomicReference<>(); + + ServerErrorCode error = + validateRequest(undeleteRequest.getBlobId().getPartition(), RequestOrResponseType.UndeleteRequest, false); + if (error != ServerErrorCode.No_Error) { + logger.error("Validating undelete request failed with error {} for request {}", error, undeleteRequest); + response.set(new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), error)); + } else { + BlobId blobId = undeleteRequest.getBlobId(); + MessageInfo info = new MessageInfo.Builder(blobId, -1, blobId.getAccountId(), blobId.getContainerId(), + undeleteRequest.getOperationTimeMs()).isUndeleted(true) + .lifeVersion(MessageInfo.LIFE_VERSION_FROM_FRONTEND) + .build(); + CloudBlobStore cloudBlobStore = + (CloudBlobStore) storeManager.getStore(undeleteRequest.getBlobId().getPartition()); + cloudBlobStore.undeleteAsync(info).whenCompleteAsync((lifeVersion, throwable) -> { + if (throwable != null) { + Exception ex = Utils.extractFutureExceptionCause(throwable); + if (ex instanceof StoreException) { + logger.error("Store exception on a undelete with error code {} for request {}", + ((StoreException) ex).getErrorCode(), undeleteRequest, ex); + StoreException storeException = (StoreException) ex; + if (storeException.getErrorCode() == StoreErrorCodes.ID_Undeleted) { + if (ex instanceof IdUndeletedStoreException) { + response.set(new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ((IdUndeletedStoreException) ex).getLifeVersion(), ServerErrorCode.Blob_Already_Undeleted)); + } else { + response.set(new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + MessageInfo.LIFE_VERSION_FROM_FRONTEND, ServerErrorCode.Blob_Already_Undeleted)); + } + } else { + logger.error("Unknown exception for undelete request{}", undeleteRequest, ex); + response.set(new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ErrorMapping.getStoreErrorMapping(storeException.getErrorCode()))); + } + } else { + logger.error("Unknown exception for undelete request{}", undeleteRequest, ex); + response.set(new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), + ServerErrorCode.Unknown_Error)); + } + } else { + response.set( + new UndeleteResponse(undeleteRequest.getCorrelationId(), undeleteRequest.getClientId(), lifeVersion)); + } + try { + requestResponseChannel.sendResponse(response.get(), request, null); + } catch (InterruptedException ie) { + logger.warn("Interrupted while enqueuing the response", ie); + } + }); + } + } +} diff --git a/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrMain.java b/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrMain.java new file mode 100644 index 0000000000..4c064684aa --- /dev/null +++ b/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrMain.java @@ -0,0 +1,62 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.github.ambry.clustermap.ClusterAgentsFactory; +import com.github.ambry.commons.LoggingNotificationSystem; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.InvocationOptions; +import com.github.ambry.utils.Utils; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Start point for creating an instance of {@link VcrServer} and starting/shutting it down. + */ +public class VcrMain { + private static final Logger logger = LoggerFactory.getLogger(VcrMain.class); + + public static void main(String[] args) { + final VcrServer vcrServer; + int exitCode = 0; + try { + InvocationOptions options = new InvocationOptions(args); + Properties properties = Utils.loadProps(options.serverPropsFilePath); + VerifiableProperties verifiableProperties = new VerifiableProperties(properties); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties); + ClusterAgentsFactory clusterAgentsFactory = + Utils.getObj(clusterMapConfig.clusterMapClusterAgentsFactory, clusterMapConfig, + options.hardwareLayoutFilePath, options.partitionLayoutFilePath); + logger.info("Bootstrapping VcrServer"); + vcrServer = new VcrServer(verifiableProperties, clusterAgentsFactory, new LoggingNotificationSystem(), null); + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + logger.info("Received shutdown signal. Shutting down VcrServer"); + vcrServer.shutdown(); + } + }); + vcrServer.startup(); + vcrServer.awaitShutdown(Integer.MAX_VALUE); + } catch (Exception e) { + logger.error("Exception during bootstrap of VcrServer", e); + exitCode = 1; + } + logger.info("Exiting VcrMain"); + System.exit(exitCode); + } +} diff --git a/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrRequests.java b/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrRequests.java new file mode 100644 index 0000000000..8a39df732e --- /dev/null +++ b/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrRequests.java @@ -0,0 +1,94 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.commons.ServerMetrics; +import com.github.ambry.network.NetworkRequest; +import com.github.ambry.network.RequestResponseChannel; +import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.protocol.RequestOrResponseType; +import com.github.ambry.replication.FindTokenHelper; +import com.github.ambry.replication.ReplicationEngine; +import com.github.ambry.server.AmbryRequests; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.Store; +import com.github.ambry.store.StoreKeyConverterFactory; +import com.github.ambry.store.StoreKeyFactory; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Request implementation class for Vcr. All requests to the vcr server are + * handled by this class. + */ +public class VcrRequests extends AmbryRequests { + + private static final Logger logger = LoggerFactory.getLogger(VcrRequests.class); + + public VcrRequests(StoreManager storeManager, RequestResponseChannel requestResponseChannel, ClusterMap clusterMap, + DataNodeId currentNode, MetricRegistry registry, ServerMetrics serverMetrics, FindTokenHelper findTokenHelper, + NotificationSystem notification, ReplicationEngine replicationEngine, StoreKeyFactory storageKeyFactory, + StoreKeyConverterFactory storeKeyConverterFactory) { + super(storeManager, requestResponseChannel, clusterMap, currentNode, registry, serverMetrics, findTokenHelper, + notification, replicationEngine, storageKeyFactory, storeKeyConverterFactory); + } + + @Override + public void handlePutRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + + @Override + public void handleDeleteRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + + @Override + public void handlePurgeRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + + @Override + public void handleTtlUpdateRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + + @Override + public void handleReplicateBlobRequest(NetworkRequest request) throws IOException, InterruptedException { + throw new UnsupportedOperationException("Request type not supported"); + } + + @Override + protected ServerErrorCode validateRequest(PartitionId partition, RequestOrResponseType requestType, + boolean skipPartitionAvailableCheck) { + // 1. Check partition is null + if (partition == null) { + metrics.badRequestError.inc(); + return ServerErrorCode.Bad_Request; + } + return ServerErrorCode.No_Error; + } + + @Override + protected long getRemoteReplicaLag(Store store, long totalBytesRead) { + return -1; + } +} diff --git a/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrServer.java b/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrServer.java new file mode 100644 index 0000000000..3c786f097d --- /dev/null +++ b/ambry-vcr/src/main/java/com/github/ambry/vcr/VcrServer.java @@ -0,0 +1,343 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.jmx.JmxReporter; +import com.github.ambry.account.AccountService; +import com.github.ambry.account.AccountServiceFactory; +import com.github.ambry.cloud.CloudDestination; +import com.github.ambry.cloud.CloudDestinationFactory; +import com.github.ambry.cloud.CloudStorageManager; +import com.github.ambry.cloud.VcrMetrics; +import com.github.ambry.cloud.VcrReplicationManager; +import com.github.ambry.clustermap.ClusterAgentsFactory; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.VcrClusterAgentsFactory; +import com.github.ambry.clustermap.VcrClusterParticipant; +import com.github.ambry.commons.NettySslHttp2Factory; +import com.github.ambry.commons.SSLFactory; +import com.github.ambry.commons.ServerMetrics; +import com.github.ambry.config.CloudConfig; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.ConnectionPoolConfig; +import com.github.ambry.config.Http2ClientConfig; +import com.github.ambry.config.NettyConfig; +import com.github.ambry.config.NetworkConfig; +import com.github.ambry.config.ReplicationConfig; +import com.github.ambry.config.SSLConfig; +import com.github.ambry.config.ServerConfig; +import com.github.ambry.config.StoreConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.network.NettyServerRequestResponseChannel; +import com.github.ambry.network.NetworkClientFactory; +import com.github.ambry.network.NetworkMetrics; +import com.github.ambry.network.NetworkServer; +import com.github.ambry.network.Port; +import com.github.ambry.network.PortType; +import com.github.ambry.network.ServerRequestResponseHelper; +import com.github.ambry.network.SocketNetworkClientFactory; +import com.github.ambry.network.SocketServer; +import com.github.ambry.network.http2.Http2ClientMetrics; +import com.github.ambry.network.http2.Http2NetworkClientFactory; +import com.github.ambry.network.http2.Http2ServerMetrics; +import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.protocol.RequestHandlerPool; +import com.github.ambry.replication.FindTokenHelper; +import com.github.ambry.rest.NettyMetrics; +import com.github.ambry.rest.NioServer; +import com.github.ambry.rest.NioServerFactory; +import com.github.ambry.rest.ServerSecurityService; +import com.github.ambry.rest.ServerSecurityServiceFactory; +import com.github.ambry.rest.StorageServerNettyFactory; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.StoreKeyConverterFactory; +import com.github.ambry.store.StoreKeyFactory; +import com.github.ambry.utils.SystemTime; +import com.github.ambry.utils.Time; +import com.github.ambry.utils.Utils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.github.ambry.utils.Utils.*; + + +/** + * Virtual Cloud Replicator server + */ +public class VcrServer { + + private CountDownLatch shutdownLatch = new CountDownLatch(1); + private NioServer nettyHttp2Server; + private NetworkServer networkServer = null; + private ScheduledExecutorService scheduler = null; + private VcrReplicationManager vcrReplicationManager = null; + private StoreManager cloudStorageManager = null; + private static final Logger logger = LoggerFactory.getLogger(VcrServer.class); + private final VerifiableProperties properties; + private final ClusterAgentsFactory clusterAgentsFactory; + private ClusterMap clusterMap; + private VcrClusterParticipant vcrClusterParticipant; + private MetricRegistry registry = null; + private JmxReporter reporter = null; + private final NotificationSystem notificationSystem; + private final Function reporterFactory; + private CloudDestinationFactory cloudDestinationFactory; + private RequestHandlerPool requestHandlerPool; + private RequestHandlerPool requestHandlerPoolForHttp2; + private CloudDestination cloudDestination; + private ServerSecurityService serverSecurityService; + private ServerMetrics serverMetrics; + + /** + * VcrServer constructor. + * @param properties the config properties to use. + * @param clusterAgentsFactory the {@link ClusterAgentsFactory} to use. + * @param notificationSystem the {@link NotificationSystem} to use. + * @param reporterFactory if non-null, use this function to set up a {@link JmxReporter} with custom settings. If this + * option is null the default settings for the reporter will be used. + */ + public VcrServer(VerifiableProperties properties, ClusterAgentsFactory clusterAgentsFactory, + NotificationSystem notificationSystem, Function reporterFactory) { + this.properties = properties; + this.clusterAgentsFactory = clusterAgentsFactory; + this.notificationSystem = notificationSystem; + this.reporterFactory = reporterFactory; + } + + /** + * Test constructor. + * @param properties the config properties to use. + * @param clusterAgentsFactory the {@link ClusterAgentsFactory} to use. + * @param notificationSystem the {@link NotificationSystem} to use. + * @param cloudDestinationFactory the {@link CloudDestinationFactory} to use. + * @param reporterFactory if non-null, use this function to set up a {@link JmxReporter} with custom settings. If this + * option is null the default settings for the reporter will be used. + */ + public VcrServer(VerifiableProperties properties, ClusterAgentsFactory clusterAgentsFactory, + NotificationSystem notificationSystem, CloudDestinationFactory cloudDestinationFactory, + Function reporterFactory) { + this(properties, clusterAgentsFactory, notificationSystem, reporterFactory); + this.cloudDestinationFactory = cloudDestinationFactory; + } + + /** + * Start the VCR Server. + * @throws InstantiationException if an error was encountered during startup. + */ + public void startup() throws InstantiationException { + try { + logger.info("starting"); + ServerConfig serverConfig = new ServerConfig(properties); + ServerSecurityServiceFactory serverSecurityServiceFactory = + Utils.getObj(serverConfig.serverSecurityServiceFactory, properties, serverMetrics, registry); + serverSecurityService = serverSecurityServiceFactory.getServerSecurityService(); + + clusterMap = clusterAgentsFactory.getClusterMap(); + logger.info("Initialized clusterMap"); + registry = clusterMap.getMetricRegistry(); + serverMetrics = new ServerMetrics(registry, VcrServer.class, VcrServer.class); + + logger.info("Setting up JMX."); + long startTime = SystemTime.getInstance().milliseconds(); + registry = clusterMap.getMetricRegistry(); + reporter = reporterFactory != null ? reporterFactory.apply(registry) : JmxReporter.forRegistry(registry).build(); + reporter.start(); + + logger.info("creating configs"); + NetworkConfig networkConfig = new NetworkConfig(properties); + StoreConfig storeConfig = new StoreConfig(properties); + ReplicationConfig replicationConfig = new ReplicationConfig(properties); + CloudConfig cloudConfig = new CloudConfig(properties); + ConnectionPoolConfig connectionPoolConfig = new ConnectionPoolConfig(properties); + ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); + SSLConfig sslConfig = new SSLConfig(properties); + // verify the configs + properties.verify(); + + // TODO Make sure that config.updaterPollingIntervalMs value is large (~one day) for VCR. + AccountServiceFactory accountServiceFactory = + Utils.getObj(serverConfig.serverAccountServiceFactory, properties, registry); + AccountService accountService = accountServiceFactory.getAccountService(); + + scheduler = Utils.newScheduler(serverConfig.serverSchedulerNumOfthreads, false); + StoreKeyFactory storeKeyFactory = Utils.getObj(storeConfig.storeKeyFactory, clusterMap); + + SSLFactory sslHttp2Factory = new NettySslHttp2Factory(sslConfig); + NetworkClientFactory networkClientFactory = null; + Time time = SystemTime.getInstance(); + if (clusterMapConfig.clusterMapEnableHttp2Replication) { + Http2ClientMetrics http2ClientMetrics = new Http2ClientMetrics(registry); + Http2ClientConfig http2ClientConfig = new Http2ClientConfig(properties); + networkClientFactory = + new Http2NetworkClientFactory(http2ClientMetrics, http2ClientConfig, sslHttp2Factory, time); + logger.info("Using http2 for VCR replication."); + } else { + SSLFactory sslSocketFactory = + clusterMapConfig.clusterMapSslEnabledDatacenters.length() > 0 ? SSLFactory.getNewInstance(sslConfig) : null; + networkClientFactory = + new SocketNetworkClientFactory(new NetworkMetrics(registry), networkConfig, sslSocketFactory, 20, 20, 50000, + time); + logger.info("Using socket channel for VCR replication."); + } + + StoreKeyConverterFactory storeKeyConverterFactory = + Utils.getObj(serverConfig.serverStoreKeyConverterFactory, properties, registry); + VcrMetrics vcrMetrics = new VcrMetrics(registry); + + logger.info("Ambry backup version = {}", cloudConfig.ambryBackupVersion); + if (cloudDestinationFactory == null) { + cloudDestinationFactory = + Utils.getObj(cloudConfig.cloudDestinationFactoryClass, properties, registry, clusterMap, accountService); + } + cloudDestination = cloudDestinationFactory.getCloudDestination(); + vcrClusterParticipant = + ((VcrClusterAgentsFactory) Utils.getObj(cloudConfig.vcrClusterAgentsFactoryClass, cloudConfig, + clusterMapConfig, clusterMap, accountService, storeConfig, cloudDestination, + registry)).getVcrClusterParticipant(); + cloudStorageManager = new CloudStorageManager(properties, vcrMetrics, cloudDestination, clusterMap); + vcrReplicationManager = new VcrReplicationManager(cloudConfig, replicationConfig, clusterMapConfig, storeConfig, + cloudStorageManager, storeKeyFactory, clusterMap, vcrClusterParticipant, cloudDestination, scheduler, + networkClientFactory, vcrMetrics, notificationSystem, storeKeyConverterFactory, + serverConfig.serverMessageTransformer); + vcrReplicationManager.start(); + + DataNodeId currentNode = vcrClusterParticipant.getCurrentDataNodeId(); + ArrayList ports = new ArrayList(); + ports.add(new Port(networkConfig.port, PortType.PLAINTEXT)); + if (currentNode.hasSSLPort()) { + ports.add(new Port(cloudConfig.vcrSslPort, PortType.SSL)); + } + networkServer = new SocketServer(networkConfig, sslConfig, registry, ports); + + //todo fix enableDataPrefetch + ServerMetrics serverMetrics = new ServerMetrics(registry, VcrRequests.class, VcrServer.class); + VcrRequests requests = + new VcrRequests(cloudStorageManager, networkServer.getRequestResponseChannel(), clusterMap, currentNode, + registry, serverMetrics, new FindTokenHelper(storeKeyFactory, replicationConfig), notificationSystem, + vcrReplicationManager, storeKeyFactory, storeKeyConverterFactory); + + requestHandlerPool = new RequestHandlerPool(serverConfig.serverRequestHandlerNumSocketServerThreads, + networkServer.getRequestResponseChannel(), requests); + + networkServer.start(); + + // Start netty http2 server + if (currentNode.hasHttp2Port()) { + logger.info("Http2 port {} is enabled. Starting HTTP/2 service.", currentNode.getHttp2Port()); + NettyConfig nettyConfig = new NettyConfig(properties); + NettyMetrics nettyMetrics = new NettyMetrics(registry); + Http2ServerMetrics http2ServerMetrics = new Http2ServerMetrics(registry); + Http2ClientConfig http2ClientConfig = new Http2ClientConfig(properties); + FindTokenHelper findTokenHelper = new FindTokenHelper(storeKeyFactory, replicationConfig); + NettyServerRequestResponseChannel requestResponseChannel = + new NettyServerRequestResponseChannel(networkConfig, http2ServerMetrics, serverMetrics, + new ServerRequestResponseHelper(clusterMap, findTokenHelper)); + VcrRequests vcrRequestsForHttp2 = + new VcrRequests(cloudStorageManager, requestResponseChannel, clusterMap, currentNode, registry, + serverMetrics, findTokenHelper, notificationSystem, vcrReplicationManager, storeKeyFactory, + storeKeyConverterFactory); + requestHandlerPoolForHttp2 = + new RequestHandlerPool(serverConfig.serverRequestHandlerNumOfThreads, requestResponseChannel, + vcrRequestsForHttp2); + + NioServerFactory nioServerFactory = + new StorageServerNettyFactory(currentNode.getHttp2Port(), requestResponseChannel, sslHttp2Factory, + nettyConfig, http2ClientConfig, serverMetrics, nettyMetrics, http2ServerMetrics, serverSecurityService); + nettyHttp2Server = nioServerFactory.getNioServer(); + nettyHttp2Server.start(); + } + + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + logger.info("VCR startup time in Ms {}", processingTime); + } catch (Exception e) { + logger.error("Error during VCR startup", e); + throw new InstantiationException("failure during VCR startup " + e); + } + } + + /** + * This method is expected to be called in the exit path as long as the AmbryServer instance construction was + * successful. This is expected to be called even if {@link #startup()} did not succeed. + */ + public void shutdown() { + long startTime = SystemTime.getInstance().milliseconds(); + try { + logger.info("VCR shutdown started"); + if (scheduler != null) { + shutDownExecutorService(scheduler, 5, TimeUnit.MINUTES); + } + if (requestHandlerPool != null) { + requestHandlerPool.shutdown(); + } + if (networkServer != null) { + networkServer.shutdown(); + } + if (requestHandlerPoolForHttp2 != null) { + requestHandlerPoolForHttp2.shutdown(); + } + if (nettyHttp2Server != null) { + nettyHttp2Server.shutdown(); + } + if (vcrReplicationManager != null) { + vcrReplicationManager.shutdown(); + } + if (reporter != null) { + reporter.stop(); + } + if (notificationSystem != null) { + try { + notificationSystem.close(); + } catch (IOException e) { + logger.error("Error while closing notification system.", e); + } + } + if (clusterMap != null) { + clusterMap.close(); + } + if (vcrClusterParticipant != null) { + vcrClusterParticipant.close(); + } + if (cloudDestination != null) { + cloudDestination.close(); + } + logger.info("VCR shutdown completed"); + } catch (Exception e) { + logger.error("Error while shutting down VCR", e); + } finally { + shutdownLatch.countDown(); + long processingTime = SystemTime.getInstance().milliseconds() - startTime; + logger.info("VCR shutdown time in Ms {}", processingTime); + } + } + + public boolean awaitShutdown(int timeoutMs) throws InterruptedException { + return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + } + + public VcrClusterParticipant getVcrClusterParticipant() { + return vcrClusterParticipant; + } + + public VcrReplicationManager getVcrReplicationManager() { + return vcrReplicationManager; + } +} diff --git a/ambry-vcr/src/test/java/com/github/ambry/vcr/LatchBasedInMemoryCloudDestination.java b/ambry-vcr/src/test/java/com/github/ambry/vcr/LatchBasedInMemoryCloudDestination.java new file mode 100644 index 0000000000..a41305fb63 --- /dev/null +++ b/ambry-vcr/src/test/java/com/github/ambry/vcr/LatchBasedInMemoryCloudDestination.java @@ -0,0 +1,628 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.github.ambry.account.Container; +import com.github.ambry.cloud.CloudBlobMetadata; +import com.github.ambry.cloud.CloudBlobStore; +import com.github.ambry.cloud.CloudContainerCompactor; +import com.github.ambry.cloud.CloudDestination; +import com.github.ambry.cloud.CloudStorageCompactor; +import com.github.ambry.cloud.CloudStorageException; +import com.github.ambry.cloud.CloudUpdateValidator; +import com.github.ambry.cloud.FindResult; +import com.github.ambry.cloud.azure.AzureReplicationFeed; +import com.github.ambry.cloud.azure.CosmosChangeFeedFindToken; +import com.github.ambry.cloud.azure.CosmosContainerDeletionEntry; +import com.github.ambry.cloud.azure.CosmosUpdateTimeFindToken; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.FutureUtils; +import com.github.ambry.replication.FindToken; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.store.MessageInfo; +import com.github.ambry.store.StoreErrorCodes; +import com.github.ambry.store.StoreException; +import com.github.ambry.utils.Pair; +import com.github.ambry.utils.Utils; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An latch based in memory implementation of {@link CloudDestination}. + */ +public class LatchBasedInMemoryCloudDestination implements CloudDestination { + + private final static Logger logger = LoggerFactory.getLogger(LatchBasedInMemoryCloudDestination.class); + private final static AzureReplicationFeed.FeedType DEFAULT_AZURE_REPLICATION_FEED_TYPE = + AzureReplicationFeed.FeedType.COSMOS_CHANGE_FEED; + private final Map> map = new HashMap<>(); + private final CountDownLatch uploadLatch; + private final CountDownLatch downloadLatch; + private final Set blobIdsToTrack = ConcurrentHashMap.newKeySet(); + private final Map tokenMap = new ConcurrentHashMap<>(); + private final AtomicLong bytesUploadedCounter = new AtomicLong(0); + private final AtomicInteger blobsUploadedCounter = new AtomicInteger(0); + private final ChangeFeed changeFeed = new ChangeFeed(); + private final AzureReplicationFeed.FeedType azureReplicationFeedType; + private final Set deprecatedContainers = new HashSet<>(); + private final ClusterMap clusterMap; + + // Used in error simulation. + private final Map errorCodeForBlobs = new HashMap<>(); + private StoreErrorCodes hardError = null; + private LinkedList serverErrors = new LinkedList(); + + /** + * Instantiate {@link LatchBasedInMemoryCloudDestination}. + * Use this constructor for tests where type of azure replication feed doesn't matter. + * @param blobIdsToTrack a list of blobs that {@link LatchBasedInMemoryCloudDestination} tracks. + * @param clusterMap {@link ClusterMap} object. + */ + public LatchBasedInMemoryCloudDestination(List blobIdsToTrack, ClusterMap clusterMap) { + logger.debug("Constructing LatchBasedInMemoryCloudDestination with {} tracked blobs", blobIdsToTrack.size()); + this.blobIdsToTrack.addAll(blobIdsToTrack); + uploadLatch = new CountDownLatch(blobIdsToTrack.size()); + downloadLatch = new CountDownLatch(blobIdsToTrack.size()); + this.azureReplicationFeedType = DEFAULT_AZURE_REPLICATION_FEED_TYPE; + this.clusterMap = clusterMap; + } + + /** + * Instantiate {@link LatchBasedInMemoryCloudDestination}. + * @param blobIdsToTrack a list of blobs that {@link LatchBasedInMemoryCloudDestination} tracks. + * @param azureReplicationFeedType {@link AzureReplicationFeed.FeedType} object. + */ + public LatchBasedInMemoryCloudDestination(List blobIdsToTrack, + AzureReplicationFeed.FeedType azureReplicationFeedType, ClusterMap clusterMap) { + logger.debug("Constructing LatchBasedInMemoryCloudDestination with {} tracked blobs", blobIdsToTrack.size()); + this.blobIdsToTrack.addAll(blobIdsToTrack); + uploadLatch = new CountDownLatch(blobIdsToTrack.size()); + downloadLatch = new CountDownLatch(blobIdsToTrack.size()); + this.azureReplicationFeedType = azureReplicationFeedType; + this.clusterMap = clusterMap; + } + + @Override + synchronized public boolean uploadBlob(BlobId blobId, long blobSize, CloudBlobMetadata cloudBlobMetadata, + InputStream blobInputStream) throws CloudStorageException { + StoreErrorCodes serverError = hardError != null ? hardError : serverErrors.size() > 0 ? serverErrors.poll() : null; + if (serverError != null) { + throw new CloudStorageException("uploadBlob simulated error", + new StoreException("uploadBlob simulated error", serverError)); + } + + if (map.containsKey(blobId)) { + return false; + } + // Note: blobSize can be -1 when we dont know the actual blob size being uploaded. + // So we have to do buffered reads to handle that case. + int bufferSz = (blobSize == -1) ? 1024 : (int) blobSize; + byte[] buffer = new byte[bufferSz]; + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + try { + int bytesRead = blobInputStream.read(buffer); + while (bytesRead > 0) { + outputStream.write(buffer, 0, bytesRead); + bytesUploadedCounter.addAndGet(Math.max(bytesRead, 0)); + bytesRead = blobInputStream.read(buffer); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + cloudBlobMetadata.setLastUpdateTime(System.currentTimeMillis()); + map.put(blobId, new Pair<>(cloudBlobMetadata, outputStream.toByteArray())); + changeFeed.add(blobId); + blobsUploadedCounter.incrementAndGet(); + if (blobIdsToTrack.remove(blobId)) { + uploadLatch.countDown(); + } + return true; + } + + @Override + public CompletableFuture uploadBlobAsync(BlobId blobId, long inputLength, + CloudBlobMetadata cloudBlobMetadata, InputStream blobInputStream) { + CompletableFuture completableFuture = new CompletableFuture<>(); + FutureUtils.completeFromCallable(completableFuture, + () -> uploadBlob(blobId, inputLength, cloudBlobMetadata, blobInputStream)); + return completableFuture; + } + + @Override + public void downloadBlob(BlobId blobId, OutputStream outputStream) throws CloudStorageException { + StoreErrorCodes serverError = hardError != null ? hardError : serverErrors.size() > 0 ? serverErrors.poll() : null; + if (serverError != null) { + throw new CloudStorageException("downloadBlob simulated error for blobid :" + blobId, + new StoreException("downloadBlob simulated error for blobid :" + blobId, serverError)); + } + + try { + if (!map.containsKey(blobId)) { + throw new CloudStorageException("Blob with blobId " + blobId.getID() + " does not exist."); + } + byte[] blobData = map.get(blobId).getSecond(); + outputStream.write(blobData); + } catch (IOException ex) { + throw new CloudStorageException( + "Could not download blob for blobid " + blobId.getID() + " due to " + ex.toString()); + } + downloadLatch.countDown(); + } + + @Override + public CompletableFuture downloadBlobAsync(BlobId blobId, OutputStream outputStream) { + CompletableFuture completableFuture = new CompletableFuture<>(); + FutureUtils.completeFromCallable(completableFuture, () -> { + downloadBlob(blobId, outputStream); + return null; + }); + return completableFuture; + } + + @Override + public boolean deleteBlob(BlobId blobId, long deletionTime, short lifeVersion, + CloudUpdateValidator cloudUpdateValidator) throws CloudStorageException { + StoreErrorCodes serverError = hardError != null ? hardError : serverErrors.size() > 0 ? serverErrors.poll() : null; + if (serverError != null) { + throw new CloudStorageException("deleteBlob simulated error", + new StoreException("deleteBlob simulated error", serverError)); + } + + if (!map.containsKey(blobId)) { + return false; + } + // The lifeVersion from message info is -1 when the undelete method is invoked by frontend request, we have to + // get the legit lifeVersion before we can write undelete record to log segment. + if (!MessageInfo.hasLifeVersion(lifeVersion)) { + lifeVersion = map.get(blobId).getFirst().getLifeVersion(); + } + map.get(blobId).getFirst().setDeletionTime(deletionTime); + map.get(blobId).getFirst().setLifeVersion(lifeVersion); + map.get(blobId).getFirst().setLastUpdateTime(System.currentTimeMillis()); + changeFeed.add(blobId); + return true; + } + + @Override + public CompletableFuture deleteBlobAsync(BlobId blobId, long deletionTime, short lifeVersion, + CloudUpdateValidator cloudUpdateValidator) { + CompletableFuture completableFuture = new CompletableFuture<>(); + FutureUtils.completeFromCallable(completableFuture, + () -> deleteBlob(blobId, deletionTime, lifeVersion, cloudUpdateValidator)); + return completableFuture; + } + + @Override + public short updateBlobExpiration(BlobId blobId, long expirationTime, CloudUpdateValidator cloudUpdateValidator) + throws CloudStorageException { + StoreErrorCodes serverError = hardError != null ? hardError : serverErrors.size() > 0 ? serverErrors.poll() : null; + if (serverError != null) { + throw new CloudStorageException("updateBlobExpiration simulated error", + new StoreException("updateBlobExpiration simulated error", serverError)); + } + + if (map.containsKey(blobId)) { + map.get(blobId).getFirst().setExpirationTime(expirationTime); + map.get(blobId).getFirst().setLastUpdateTime(System.currentTimeMillis()); + changeFeed.add(blobId); + return map.get(blobId).getFirst().getLifeVersion(); + } + throw new CloudStorageException(String.format("Blob %s not found", blobId.getID())); + } + + @Override + public CompletableFuture updateBlobExpirationAsync(BlobId blobId, long expirationTime, + CloudUpdateValidator cloudUpdateValidator) { + CompletableFuture completableFuture = new CompletableFuture<>(); + FutureUtils.completeFromCallable(completableFuture, + () -> updateBlobExpiration(blobId, expirationTime, cloudUpdateValidator)); + return completableFuture; + } + + @Override + public short undeleteBlob(BlobId blobId, short lifeVersion, CloudUpdateValidator cloudUpdateValidator) + throws CloudStorageException { + StoreErrorCodes serverError = hardError != null ? hardError : serverErrors.size() > 0 ? serverErrors.poll() : null; + if (serverError != null) { + throw new CloudStorageException("undeleteBlob simulated error", + new StoreException("undeleteBlob simulated error", serverError)); + } + + if (map.containsKey(blobId)) { + if (!MessageInfo.hasLifeVersion((lifeVersion))) { + lifeVersion = map.get(blobId).getFirst().getLifeVersion(); + lifeVersion++; + } + map.get(blobId).getFirst().setLifeVersion(lifeVersion); + map.get(blobId).getFirst().setDeletionTime(Utils.Infinite_Time); + map.get(blobId).getFirst().setLastUpdateTime(System.currentTimeMillis()); + changeFeed.add(blobId); + return map.get(blobId).getFirst().getLifeVersion(); + } else { + throw new CloudStorageException( + String.format("Cannot update lifeversion as blob %s is not found.", blobId.getID()), null, + CloudBlobStore.STATUS_NOT_FOUND, false, null); + } + } + + @Override + public CompletableFuture undeleteBlobAsync(BlobId blobId, short lifeVersion, + CloudUpdateValidator cloudUpdateValidator) { + CompletableFuture completableFuture = new CompletableFuture<>(); + FutureUtils.completeFromCallable(completableFuture, () -> undeleteBlob(blobId, lifeVersion, cloudUpdateValidator)); + return completableFuture; + } + + @Override + public Map getBlobMetadata(List blobIds) throws CloudStorageException { + StoreErrorCodes serverError = hardError != null ? hardError : serverErrors.size() > 0 ? serverErrors.poll() : null; + if (serverError != null) { + throw new CloudStorageException("getBlobMetadata simulated error", + new StoreException("getBlobMetadata simulated error", serverError)); + } + + Map result = new HashMap<>(); + for (BlobId blobId : blobIds) { + if (map.containsKey(blobId)) { + result.put(blobId.toString(), map.get(blobId).getFirst()); + } + } + return result; + } + + @Override + public CompletableFuture> getBlobMetadataAsync(List blobIds) { + CompletableFuture> completableFuture = new CompletableFuture<>(); + FutureUtils.completeFromCallable(completableFuture, () -> getBlobMetadata(blobIds)); + return completableFuture; + } + + @Override + public FindResult findEntriesSince(String partitionPath, FindToken findToken, long maxTotalSizeOfEntries) { + switch (azureReplicationFeedType) { + case COSMOS_CHANGE_FEED: + return findChangeFeedBasedEntries(partitionPath, findToken, maxTotalSizeOfEntries); + case COSMOS_UPDATE_TIME: + return findUpdateTimeBasedEntries(partitionPath, findToken, maxTotalSizeOfEntries); + default: + throw new IllegalArgumentException( + String.format("Unknown azure replication feed type: %s", azureReplicationFeedType)); + } + } + + /** + * To be consistent with MockServer. We need the following four error injection APIs. + * Different from MockServer, we use {@link StoreErrorCodes} instead of {@link ServerErrorCode} as the error code. + */ + + /** + * Set the error for each request from this point onwards that affects subsequent requests sent to this node + * (until/unless the next set or reset error method is invoked). + * Each request from the list is used exactly once and in order. So, if the list contains {No_Error, Unknown_Error, + * Disk_Error}, then the first, second and third requests would receive No_Error, + * Unknown_Error and Disk_Error respectively. Once the errors are exhausted, the default No_Error is assumed for + * all further requests until the next call to this method. + * @param serverErrors the list of errors that affects subsequent PutRequests. + */ + public void setServerErrors(List serverErrors) { + this.serverErrors.clear(); + this.serverErrors.addAll(serverErrors); + } + + /** + * Set the error to be set in the responses for all requests from this point onwards (until/unless another set or + * reset method for errors is invoked). + * @param serverError the error to set from this point onwards. + */ + public void setServerErrorForAllRequests(StoreErrorCodes serverError) { + this.hardError = serverError; + } + + /** + * Clear the error for subsequent requests. That is all responses from this point onwards will be successful + * until/unless another set error method is invoked. + */ + public void resetServerErrors() { + this.serverErrors.clear(); + this.hardError = null; + } + + /** + * Sets up this {@link LatchBasedInMemoryCloudDestination} such that it returns the given {@code errorCode} for the given {@code blobId} for + * get, ttl update and delete (not put). + * @param blobId the blob id for which the error code must apply + * @param errorCode the error code to apply + */ + public void setErrorCodeForBlob(String blobId, StoreErrorCodes errorCode) { + // not supported yet + } + + @Override + public void close() { + } + + /** + * Populates an ordered sequenced list of blobs in the specified partition in {@code nextEntries} {@link List}, + * ordered by change feed. Returns the updated {@link FindToken}. + * @param partitionPath the partition to query. + * @param findToken the {@link FindToken} specifying the boundary for the query. + * @param maxTotalSizeOfEntries the cumulative size limit for the list of blobs returned. + * @return {@link FindResult} instance that contains updated {@link CosmosChangeFeedFindToken} object which can act as a bookmark for + * subsequent requests, and {@link List} of {@link CloudBlobMetadata} entries. + * @throws CloudStorageException + */ + private FindResult findChangeFeedBasedEntries(String partitionPath, FindToken findToken, long maxTotalSizeOfEntries) { + List nextEntries = new ArrayList<>(); + String continuationToken = ((CosmosChangeFeedFindToken) findToken).getEndContinuationToken(); + List blobIds = new ArrayList<>(); + getFeed(continuationToken, maxTotalSizeOfEntries, blobIds); + nextEntries.addAll(blobIds.stream().map(blobId -> map.get(blobId).getFirst()).collect(Collectors.toList())); + CosmosChangeFeedFindToken cosmosChangeFeedFindToken = (CosmosChangeFeedFindToken) findToken; + if (blobIds.size() != 0) { + long bytesToBeRead = nextEntries.stream().mapToLong(CloudBlobMetadata::getSize).sum(); + cosmosChangeFeedFindToken = + new CosmosChangeFeedFindToken(bytesToBeRead, changeFeed.getContinuationTokenForBlob(blobIds.get(0)), + createEndContinuationToken(blobIds), 0, blobIds.size(), changeFeed.getReqUuid(), + cosmosChangeFeedFindToken.getVersion()); + } + return new FindResult(nextEntries, cosmosChangeFeedFindToken); + } + + /** + * Populates an ordered sequenced list of blobs in the specified partition in {@code nextEntries} {@link List}, + * ordered by update time of blobs. Returns the updated {@link FindToken}. + * @param partitionPath the partition to query. + * @param findToken the {@link FindToken} specifying the boundary for the query. + * @param maxTotalSizeOfEntries the cumulative size limit for the list of blobs returned. + * @return {@link FindResult} instance that contains updated {@link CosmosUpdateTimeFindToken} object which can act as a bookmark for + * subsequent requests, and {@link List} of {@link CloudBlobMetadata} entries. + * @throws CloudStorageException + */ + private FindResult findUpdateTimeBasedEntries(String partitionPath, FindToken findToken, long maxTotalSizeOfEntries) { + List nextEntries = new ArrayList<>(); + CosmosUpdateTimeFindToken cosmosUpdateTimeFindToken = (CosmosUpdateTimeFindToken) findToken; + List entries = new LinkedList<>(); + for (BlobId blobId : map.keySet()) { + if (map.get(blobId).getFirst().getLastUpdateTime() >= cosmosUpdateTimeFindToken.getLastUpdateTime()) { + if (cosmosUpdateTimeFindToken.getLastUpdateTimeReadBlobIds().contains(map.get(blobId).getFirst().getId())) { + continue; + } + entries.add(map.get(blobId).getFirst()); + } + } + Collections.sort(entries, Comparator.comparingLong(CloudBlobMetadata::getLastUpdateTime)); + + List cappedRsults = CloudBlobMetadata.capMetadataListBySize(entries, maxTotalSizeOfEntries); + nextEntries.addAll(cappedRsults); + return new FindResult(nextEntries, + CosmosUpdateTimeFindToken.getUpdatedToken(cosmosUpdateTimeFindToken, cappedRsults)); + } + + private String createEndContinuationToken(List blobIds) { + return Integer.toString( + Integer.parseInt(changeFeed.getContinuationTokenForBlob(blobIds.get(blobIds.size() - 1))) + 1); + } + + /** + * Get the change feed starting from given continuation token upto {@code maxLimit} number of items. + * @param continuationToken starting token for the change feed. + * @param maxTotalSizeOfEntries max size of all the blobs returned in changefeed. + * @param feed {@link List} of {@link BlobId}s to be populated with the change feed. + */ + private void getFeed(String continuationToken, long maxTotalSizeOfEntries, List feed) { + int continuationTokenCounter = changeFeed.getContinuationTokenCounter(); + if (continuationToken.equals("")) { + continuationToken = "0"; + } + // there are no changes since last continuation token or there is no change feed at all, then return + if (Integer.parseInt(continuationToken) == continuationTokenCounter + 1 || continuationTokenCounter == -1) { + return; + } + // check if its an invalid continuation token + if (!changeFeed.getContinuationTokenToBlobIdMap().containsKey(continuationToken)) { + throw new IllegalArgumentException("Invalid continuation token"); + } + // iterate through change feed till it ends or maxLimit or maxTotalSizeofEntries is reached + String continuationTokenCtr = continuationToken; + long totalFeedSize = 0; + while (changeFeed.getContinuationTokenToBlobIdMap().containsKey(continuationTokenCtr) + && totalFeedSize < maxTotalSizeOfEntries) { + if (changeFeed.getContinuationTokenToBlobIdMap().get(continuationTokenCtr) != null) { + feed.add(changeFeed.getContinuationTokenToBlobIdMap().get(continuationTokenCtr)); + totalFeedSize += + map.get(changeFeed.getContinuationTokenToBlobIdMap().get(continuationTokenCtr)).getFirst().getSize(); + } + continuationTokenCtr = Integer.toString(Integer.parseInt(continuationTokenCtr) + 1); + } + } + + @Override + public int compactPartition(String partitionPath) { + return 0; + } + + @Override + public boolean stopCompaction() { + return true; + } + + @Override + public boolean isCompactionStopped(String unused, CloudStorageCompactor unused2) { + return false; + } + + @Override + public void deprecateContainers(Collection deletedContainers) { + this.deprecatedContainers.addAll(deletedContainers.stream() + .map(container -> CosmosContainerDeletionEntry.fromContainer(container, + clusterMap.getAllPartitionIds(null).stream().map(PartitionId::toPathString).collect(Collectors.toSet()))) + .collect(Collectors.toList())); + } + + @Override + public CloudContainerCompactor getContainerCompactor() { + return new CloudContainerCompactor() { + @Override + public void compactAssignedDeprecatedContainers(Collection assignedPartitions) { + } + + @Override + public void shutdown() { + } + }; + } + + /** + * For testing + * @param blobId + * @return + */ + @Override + public boolean doesBlobExist(BlobId blobId) { + return map.containsKey(blobId); + } + + @Override + public void persistTokens(String partitionPath, String tokenFileName, InputStream inputStream) + throws CloudStorageException { + try { + tokenMap.put(partitionPath + tokenFileName, IOUtils.toByteArray(inputStream)); + } catch (IOException e) { + throw new CloudStorageException("Read input stream error", e); + } + } + + @Override + public boolean retrieveTokens(String partitionPath, String tokenFileName, OutputStream outputStream) + throws CloudStorageException { + if (tokenMap.get(partitionPath + tokenFileName) == null) { + return false; + } + try { + outputStream.write(tokenMap.get(partitionPath + tokenFileName)); + return true; + } catch (IOException e) { + throw new CloudStorageException("Write to stream error", e); + } + } + + public Map getTokenMap() { + return tokenMap; + } + + int getBlobsUploaded() { + return blobsUploadedCounter.get(); + } + + long getBytesUploaded() { + return bytesUploadedCounter.get(); + } + + public boolean awaitUpload(long duration, TimeUnit timeUnit) throws InterruptedException { + return uploadLatch.await(duration, timeUnit); + } + + public boolean awaitDownload(long duration, TimeUnit timeUnit) throws InterruptedException { + return downloadLatch.await(duration, timeUnit); + } + + /** + * @return {@code deletedContainers}. + */ + public Set getDeletedContainers() { + return deprecatedContainers; + } + + /** + * Class representing change feed for {@link LatchBasedInMemoryCloudDestination} + */ + class ChangeFeed { + private final Map continuationTokenToBlobIdMap = new HashMap<>(); + private final Map blobIdToContinuationTokenMap = new HashMap<>(); + private final String reqUuid = UUID.randomUUID().toString(); + private int continuationTokenCounter = -1; + + /** + * Add a blobid to the change feed. + * @param blobId {@link BlobId} to add. + */ + void add(BlobId blobId) { + if (blobIdToContinuationTokenMap.containsKey(blobId)) { + continuationTokenToBlobIdMap.put(blobIdToContinuationTokenMap.get(blobId), null); + } + continuationTokenToBlobIdMap.put(Integer.toString(++continuationTokenCounter), blobId); + blobIdToContinuationTokenMap.put(blobId, Integer.toString(continuationTokenCounter)); + } + + /** + * Return continuation token for specified {@link BlobId} + * @param blobId {@link BlobId} object. + * @return continuation token. + */ + String getContinuationTokenForBlob(BlobId blobId) { + return blobIdToContinuationTokenMap.get(blobId); + } + + /** + * Return continuation token to blob id map. + * @return {@code continuationTokenToBlobIdMap}. + */ + Map getContinuationTokenToBlobIdMap() { + return continuationTokenToBlobIdMap; + } + + /** + * Return continuation token counter. + * @return {@code continuationTokenCounter}. + */ + int getContinuationTokenCounter() { + return continuationTokenCounter; + } + + /** + * Return request uuid. + * @return {@code reqUuid} + */ + String getReqUuid() { + return reqUuid; + } + } +} diff --git a/ambry-vcr/src/test/java/com/github/ambry/vcr/LatchBasedInMemoryCloudDestinationFactory.java b/ambry-vcr/src/test/java/com/github/ambry/vcr/LatchBasedInMemoryCloudDestinationFactory.java new file mode 100644 index 0000000000..59cb75aae2 --- /dev/null +++ b/ambry-vcr/src/test/java/com/github/ambry/vcr/LatchBasedInMemoryCloudDestinationFactory.java @@ -0,0 +1,56 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.account.AccountService; +import com.github.ambry.cloud.CloudDestination; +import com.github.ambry.cloud.CloudDestinationFactory; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.config.VerifiableProperties; +import java.util.Collections; + + +/** + * An implementation of {@link CloudDestinationFactory} to produce {@link LatchBasedInMemoryCloudDestination} + */ +public class LatchBasedInMemoryCloudDestinationFactory implements CloudDestinationFactory { + LatchBasedInMemoryCloudDestination destination; + + /** + * Instantiate {@link LatchBasedInMemoryCloudDestinationFactory}. + * @param destination the instance of {@link LatchBasedInMemoryCloudDestination}. + */ + public LatchBasedInMemoryCloudDestinationFactory(LatchBasedInMemoryCloudDestination destination) { + this.destination = destination; + } + + /** + * Constructor for {@link LatchBasedInMemoryCloudDestinationFactory}. + * @param verifiableProperties {@link VerifiableProperties} object. + * @param metricRegistry {@link MetricRegistry} object. + * @param clusterMap {@link ClusterMap} object. + * @param accountService {@link AccountService} object. + */ + public LatchBasedInMemoryCloudDestinationFactory(VerifiableProperties verifiableProperties, + MetricRegistry metricRegistry, ClusterMap clusterMap, AccountService accountService) { + destination = new LatchBasedInMemoryCloudDestination(Collections.emptyList(), clusterMap); + } + + @Override + public CloudDestination getCloudDestination() throws IllegalStateException { + return destination; + } +} + diff --git a/ambry-vcr/src/test/java/com/github/ambry/vcr/VcrServerTest.java b/ambry-vcr/src/test/java/com/github/ambry/vcr/VcrServerTest.java new file mode 100644 index 0000000000..286bc8d0e7 --- /dev/null +++ b/ambry-vcr/src/test/java/com/github/ambry/vcr/VcrServerTest.java @@ -0,0 +1,153 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.jmx.DefaultObjectNameFactory; +import com.codahale.metrics.jmx.JmxReporter; +import com.codahale.metrics.jmx.ObjectNameFactory; +import com.github.ambry.cloud.CloudDestinationFactory; +import com.github.ambry.cloud.HelixVcrClusterParticipant; +import com.github.ambry.cloud.StaticVcrClusterAgentsFactory; +import com.github.ambry.cloud.StaticVcrClusterParticipant; +import com.github.ambry.cloud.VcrServer; +import com.github.ambry.clustermap.MockClusterAgentsFactory; +import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.commons.SSLFactory; +import com.github.ambry.commons.TestSSLUtils; +import com.github.ambry.config.CloudConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.utils.HelixControllerManager; +import com.github.ambry.utils.TestUtils; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import java.util.function.Function; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.mockito.Mockito.*; + + +/** + * Test of the VCR Server. + */ +public class VcrServerTest { + + private static MockClusterAgentsFactory mockClusterAgentsFactory; + private static MockClusterMap mockClusterMap; + private static NotificationSystem notificationSystem; + + @BeforeClass + public static void setup() throws Exception { + mockClusterAgentsFactory = new MockClusterAgentsFactory(false, true, 1, 1, 2); + mockClusterMap = mockClusterAgentsFactory.getClusterMap(); + notificationSystem = mock(NotificationSystem.class); + } + + @AfterClass + public static void cleanUp() throws IOException { + mockClusterMap.cleanup(); + } + + /** + * Bring up the VCR server and then shut it down with {@link StaticVcrClusterParticipant}. + * @throws Exception + */ + @Test + public void testVCRServerWithStaticCluster() throws Exception { + VerifiableProperties verifiableProperties = getStaticClusterVcrProps(); + com.github.ambry.cloud.VcrServer + vcrServer = new com.github.ambry.cloud.VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, null); + vcrServer.startup(); + Assert.assertNull("Expected null compactor", vcrServer.getVcrReplicationManager().getCloudStorageCompactor()); + vcrServer.shutdown(); + } + + /** + * Bring up the VCR server and then shut it down with {@link StaticVcrClusterParticipant} and a custom {@link JmxReporter} + * factory. + * @throws Exception + */ + @Test + public void testVCRServerWithReporterFactory() throws Exception { + VerifiableProperties verifiableProperties = getStaticClusterVcrProps(); + ObjectNameFactory spyObjectNameFactory = spy(new DefaultObjectNameFactory()); + Function reporterFactory = + reporter -> JmxReporter.forRegistry(reporter).createsObjectNamesWith(spyObjectNameFactory).build(); + com.github.ambry.cloud.VcrServer vcrServer = + new com.github.ambry.cloud.VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, reporterFactory); + vcrServer.startup(); + // check that the custom ObjectNameFactory specified in reporterFactory was used. + verify(spyObjectNameFactory, atLeastOnce()).createName(anyString(), anyString(), anyString()); + vcrServer.shutdown(); + } + + /** + * Bring up the VCR server and then shut it down with {@link HelixVcrClusterParticipant}. + * @throws Exception + */ + @Test + public void testVCRServerWithHelixCluster() throws Exception { + Properties serverSSLProps = new Properties(); + File trustStoreFile = File.createTempFile("truststore", ".jks"); + TestSSLUtils.addSSLProperties(serverSSLProps, "DC1,DC2,DC3", SSLFactory.Mode.SERVER, trustStoreFile, "server"); + TestSSLUtils.addHttp2Properties(serverSSLProps, SSLFactory.Mode.SERVER, true); + int zkPort = 31999; + String zkConnectString = "localhost:" + zkPort; + String vcrClusterName = "vcrTestCluster"; + TestUtils.ZkInfo zkInfo = new TestUtils.ZkInfo(TestUtils.getTempDir("helixVcr"), "DC1", (byte) 1, zkPort, true); + HelixControllerManager helixControllerManager = + VcrTestUtil.populateZkInfoAndStartController(zkConnectString, vcrClusterName, mockClusterMap); + Properties props = + VcrTestUtil.createVcrProperties("DC1", vcrClusterName, zkConnectString, 12300, 12400, 12500, serverSSLProps); + CloudDestinationFactory cloudDestinationFactory = new LatchBasedInMemoryCloudDestinationFactory( + new LatchBasedInMemoryCloudDestination(Collections.emptyList(), mockClusterMap)); + VerifiableProperties verifiableProperties = new VerifiableProperties(props); + com.github.ambry.cloud.VcrServer vcrServer = + new VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, cloudDestinationFactory, + null); + vcrServer.startup(); + Assert.assertNotNull("Expected compactor", vcrServer.getVcrReplicationManager().getCloudStorageCompactor()); + vcrServer.shutdown(); + helixControllerManager.syncStop(); + zkInfo.shutdown(); + } + + /** + * @return {@link VerifiableProperties} to start a VCR with a static cluster. + */ + private VerifiableProperties getStaticClusterVcrProps() throws Exception { + + Properties serverSSLProps = new Properties(); + File trustStoreFile = File.createTempFile("truststore", ".jks"); + TestSSLUtils.addSSLProperties(serverSSLProps, "DC1,DC2,DC3", SSLFactory.Mode.SERVER, trustStoreFile, "server"); + TestSSLUtils.addHttp2Properties(serverSSLProps, SSLFactory.Mode.SERVER, true); + Properties props = + VcrTestUtil.createVcrProperties("DC1", "vcrClusterName", "", 12300, 12400, 12500, serverSSLProps); + props.setProperty(CloudConfig.VCR_ASSIGNED_PARTITIONS, "0,1"); + props.setProperty(CloudConfig.VCR_CLUSTER_AGENTS_FACTORY_CLASS, StaticVcrClusterAgentsFactory.class.getName()); + // Run this one with compaction disabled + props.setProperty(CloudConfig.CLOUD_BLOB_COMPACTION_ENABLED, "false"); + props.setProperty(CloudConfig.CLOUD_DESTINATION_FACTORY_CLASS, + "com.github.ambry.cloud.LatchBasedInMemoryCloudDestinationFactory"); + props.setProperty("clustermap.enable.http2.replication", "true"); + props.setProperty("server.security.service.factory", "com.github.ambry.cloud.AmbryVcrSecurityServiceFactory"); + return new VerifiableProperties(props); + } +} diff --git a/ambry-vcr/src/test/java/com/github/ambry/vcr/VcrTestUtil.java b/ambry-vcr/src/test/java/com/github/ambry/vcr/VcrTestUtil.java new file mode 100644 index 0000000000..aa5cb6d64c --- /dev/null +++ b/ambry-vcr/src/test/java/com/github/ambry/vcr/VcrTestUtil.java @@ -0,0 +1,200 @@ +/** + * Copyright 2019 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.vcr; + +import com.github.ambry.cloud.CloudDestinationFactory; +import com.github.ambry.cloud.HelixVcrClusterAgentsFactory; +import com.github.ambry.cloud.OnlineOfflineHelixVcrStateModelFactory; +import com.github.ambry.cloud.VcrServer; +import com.github.ambry.clustermap.ClusterAgentsFactory; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.config.CloudConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.notification.NotificationSystem; +import com.github.ambry.utils.HelixControllerManager; +import com.github.ambry.utils.TestUtils; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; + + +/** + * Utility class for VCR tests. + */ +public class VcrTestUtil { + + public static final String helixResource = "resource1"; + private static final int MIN_ACTIVE_REPLICAS = 0; + private static final long REBALANCE_DELAY_MS = TimeUnit.SECONDS.toMillis(2); + private static final int NUM_REPLICAS = 1; + + /** + * Create a {@link com.github.ambry.cloud.VcrServer}. + * @param properties the config properties to use. + * @param clusterAgentsFactory the {@link ClusterAgentsFactory} to use. + * @param notificationSystem the {@link NotificationSystem} to use. + * @param cloudDestinationFactory the {@link CloudDestinationFactory} to use. + * @return the created VCR server. + */ + public static com.github.ambry.cloud.VcrServer createVcrServer(VerifiableProperties properties, ClusterAgentsFactory clusterAgentsFactory, + NotificationSystem notificationSystem, CloudDestinationFactory cloudDestinationFactory) { + return new VcrServer(properties, clusterAgentsFactory, notificationSystem, cloudDestinationFactory, null); + } + + /** + * Populate info on ZooKeeper server and start {@link HelixControllerManager}. + * @param zkConnectString zk connect string to zk server. + * @param vcrClusterName the vcr cluster name. + * @param clusterMap the {@link ClusterMap} to use. + * @return the created {@link HelixControllerManager}. + */ + public static HelixControllerManager populateZkInfoAndStartController(String zkConnectString, String vcrClusterName, + ClusterMap clusterMap) { + return populateZkInfoAndStartController(zkConnectString, vcrClusterName, clusterMap, OnlineOfflineSMD.name); + } + + /** + * Populate info on ZooKeeper server and start {@link HelixControllerManager}. + * @param zkConnectString zk connect string to zk server. + * @param vcrClusterName the vcr cluster name. + * @param clusterMap the {@link ClusterMap} to use. + * @param vcrHelixStateModel the state model to use for helix cluster events. + * @return the created {@link HelixControllerManager}. + */ + public static HelixControllerManager populateZkInfoAndStartController(String zkConnectString, String vcrClusterName, + ClusterMap clusterMap, String vcrHelixStateModel) { + HelixZkClient zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkConnectString), new HelixZkClient.ZkClientConfig()); + try { + zkClient.setZkSerializer(new ZNRecordSerializer()); + ClusterSetup clusterSetup = new ClusterSetup(zkClient); + clusterSetup.addCluster(vcrClusterName, true); + HelixAdmin admin = new ZKHelixAdmin(zkClient); + // set ALLOW_PARTICIPANT_AUTO_JOIN + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER). + forCluster(vcrClusterName).build(); + Map helixClusterProperties = new HashMap<>(); + helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true)); + admin.setConfig(configScope, helixClusterProperties); + // set PersistBestPossibleAssignment + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(vcrClusterName); + clusterConfig.setPersistBestPossibleAssignment(true); + configAccessor.setClusterConfig(vcrClusterName, clusterConfig); + + FullAutoModeISBuilder builder = new FullAutoModeISBuilder(helixResource); + builder.setStateModel(vcrHelixStateModel); + for (PartitionId partitionId : clusterMap.getAllPartitionIds(null)) { + builder.add(partitionId.toPathString()); + } + builder.setMinActiveReplica(MIN_ACTIVE_REPLICAS); + builder.setRebalanceDelay((int) REBALANCE_DELAY_MS); + builder.setRebalancerClass(DelayedAutoRebalancer.class.getName()); + builder.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); + IdealState idealState = builder.build(); + admin.addResource(vcrClusterName, helixResource, idealState); + admin.rebalance(vcrClusterName, helixResource, NUM_REPLICAS, "", ""); + HelixControllerManager helixControllerManager = new HelixControllerManager(zkConnectString, vcrClusterName); + helixControllerManager.syncStart(); + return helixControllerManager; + } finally { + zkClient.close(); + } + } + + /** + * Create a {@link Properties} for VCR. + * @param datacenter the datacenter to use. + * @param vcrClusterName the vcrClusterName to use. + * @param zkConnectString the zkConnectString to use. + * @param clusterMapPort the clusterMapPort to use. + * @param vcrSslPort the vcrSslPort to use. + * @param vcrHttp2Port the vcrHttp2Port to use. + * @param vcrSSLProps the SSL Properties to use if exist. Can be {@code null}. + * @return the created VCR {@link Properties}. + */ + public static Properties createVcrProperties(String datacenter, String vcrClusterName, String zkConnectString, + int clusterMapPort, int vcrSslPort, int vcrHttp2Port, Properties vcrSSLProps) { + return createVcrProperties(datacenter, vcrClusterName, zkConnectString, clusterMapPort, vcrSslPort, vcrHttp2Port, + vcrSSLProps, OnlineOfflineHelixVcrStateModelFactory.class.getName(), false); + } + + /** + * Create a {@link Properties} for VCR. + * @param datacenter the datacenter to use. + * @param vcrClusterName the vcrClusterName to use. + * @param zkConnectString the zkConnectString to use. + * @param clusterMapPort the clusterMapPort to use. + * @param vcrSslPort the vcrSslPort to use. + * @param vcrHttp2Port the vcrHttp2Port to use. + * @param vcrSSLProps the SSL Properties to use if exist. Can be {@code null}. + * @param vcrHelixStateModelFactoryClass the state model factory class. + * @param enableHttp2Replication enable http2 replication or not. + * @return the created VCR {@link Properties}. + */ + public static Properties createVcrProperties(String datacenter, String vcrClusterName, String zkConnectString, + int clusterMapPort, int vcrSslPort, int vcrHttp2Port, Properties vcrSSLProps, + String vcrHelixStateModelFactoryClass, boolean enableHttp2Replication) { + // Start the VCR and CloudBackupManager + Properties props = new Properties(); + props.setProperty(CloudConfig.CLOUD_IS_VCR, Boolean.TRUE.toString()); + props.setProperty("connectionpool.read.timeout.ms", "15000"); + props.setProperty("server.scheduler.num.of.threads", "1"); + props.setProperty("num.io.threads", "1"); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.resolve.hostnames", "false"); + props.setProperty("clustermap.cluster.name", "thisIsClusterName"); + props.setProperty("clustermap.datacenter.name", datacenter); + props.setProperty("vcr.source.datacenters", datacenter); + props.setProperty("clustermap.port", Integer.toString(clusterMapPort)); + props.setProperty("port", Integer.toString(clusterMapPort)); + props.setProperty("vcr.helix.state.model.factory.class", vcrHelixStateModelFactoryClass); + props.setProperty("server.security.service.factory", "com.github.ambry.cloud.AmbryVcrSecurityServiceFactory"); + if (vcrSSLProps == null) { + props.setProperty("clustermap.ssl.enabled.datacenters", ""); + } else { + props.putAll(vcrSSLProps); + props.setProperty("clustermap.ssl.enabled.datacenters", datacenter); + props.setProperty(CloudConfig.VCR_SSL_PORT, Integer.toString(vcrSslPort)); + props.setProperty(CloudConfig.VCR_HTTP2_PORT, Integer.toString(vcrHttp2Port)); + } + props.setProperty(CloudConfig.VCR_CLUSTER_NAME, vcrClusterName); + props.setProperty(CloudConfig.VCR_CLUSTER_AGENTS_FACTORY_CLASS, HelixVcrClusterAgentsFactory.class.getName()); + props.setProperty(CloudConfig.VCR_CLUSTER_ZK_CONNECT_STRING, zkConnectString); + props.setProperty(CloudConfig.KMS_SERVICE_KEY_CONTEXT, TestUtils.getRandomKey(32)); + props.setProperty("kms.default.container.key", TestUtils.getRandomKey(16)); + props.setProperty("replication.token.flush.delay.seconds", "100000"); + props.setProperty("replication.token.flush.interval.seconds", "500000"); + props.setProperty("clustermap.enable.http2.replication", Boolean.toString(enableHttp2Replication)); + return props; + } +} diff --git a/build.gradle b/build.gradle index 32a149a21a..77e3b0f9bd 100644 --- a/build.gradle +++ b/build.gradle @@ -508,6 +508,18 @@ project(':ambry-cloud') { } } +project(':ambry-vcr') { + dependencies { + compile project(':ambry-server'), + project(':ambry-cloud') + + testCompile project(':ambry-commons') + testCompile project(':ambry-test-utils') + testCompile project(':ambry-utils') + testCompile project(path: ':ambry-clustermap', configuration: 'testArchives') + } +} + project(':ambry-named-mysql') { dependencies { compile project(':ambry-api') diff --git a/settings.gradle b/settings.gradle index e057e429ce..d19582e993 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,5 +29,6 @@ include 'ambry-api', 'ambry-all', 'ambry-quota', 'ambry-mysql', - 'ambry-filesystem' + 'ambry-filesystem', + 'ambry-vcr'