From 89aaa26cb2c7f4ca711d3b93967ab83cd0bfd3c9 Mon Sep 17 00:00:00 2001 From: Vihang Patil Date: Fri, 31 Aug 2018 05:30:49 +0200 Subject: [PATCH] Improvement: Pseudonymized msisdn before pushing to PubSub. Fix: Connect to datastore emulator when running in GCP via CI/CD. --- .../DataConsumptionInfoPublisher.kt | 15 ++- docker-compose.override.yaml | 1 - .../pseudonymizer/PseudonymizerService.kt | 4 + prime/build.gradle | 2 +- prime/config/test.yaml | 2 + .../org/ostelco/pseudonym/PseudonymModule.kt | 2 +- .../pseudonym/resources/PseudonymResource.kt | 6 +- .../{managed => service}/PseudonymExport.kt | 2 +- .../service/PseudonymizerServiceSingleton.kt | 125 +++++++++++------- .../pseudonym/PseudonymResourceTest.kt | 2 +- 10 files changed, 101 insertions(+), 60 deletions(-) rename pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/{managed => service}/PseudonymExport.kt (99%) diff --git a/analytics-module/src/main/kotlin/org/ostelco/prime/analytics/publishers/DataConsumptionInfoPublisher.kt b/analytics-module/src/main/kotlin/org/ostelco/prime/analytics/publishers/DataConsumptionInfoPublisher.kt index feec66edc..674750198 100644 --- a/analytics-module/src/main/kotlin/org/ostelco/prime/analytics/publishers/DataConsumptionInfoPublisher.kt +++ b/analytics-module/src/main/kotlin/org/ostelco/prime/analytics/publishers/DataConsumptionInfoPublisher.kt @@ -11,6 +11,8 @@ import io.dropwizard.lifecycle.Managed import org.ostelco.analytics.api.DataTrafficInfo import org.ostelco.prime.analytics.ConfigRegistry.config import org.ostelco.prime.logger +import org.ostelco.prime.module.getResource +import org.ostelco.prime.pseudonymizer.PseudonymizerService import java.io.IOException import java.time.Instant @@ -21,6 +23,8 @@ object DataConsumptionInfoPublisher : Managed { private val logger by logger() + private val pseudonymizerService by lazy { getResource() } + private lateinit var publisher: Publisher @Throws(IOException::class) @@ -40,11 +44,18 @@ object DataConsumptionInfoPublisher : Managed { fun publish(msisdn: String, usedBucketBytes: Long, bundleBytes: Long) { + if (usedBucketBytes == 0L) { + return + } + + val now = Instant.now().toEpochMilli() + val pseudonym = pseudonymizerService.getPseudonymEntityFor(msisdn, now).pseudonym + val data = DataTrafficInfo.newBuilder() - .setMsisdn(msisdn) + .setMsisdn(pseudonym) .setBucketBytes(usedBucketBytes) .setBundleBytes(bundleBytes) - .setTimestamp(Timestamps.fromMillis(Instant.now().toEpochMilli())) + .setTimestamp(Timestamps.fromMillis(now)) .build() .toByteString() diff --git a/docker-compose.override.yaml b/docker-compose.override.yaml index 2923d1a8d..9d810bf56 100644 --- a/docker-compose.override.yaml +++ b/docker-compose.override.yaml @@ -50,7 +50,6 @@ services: ipv4_address: 172.16.238.4 default: - metrics-esp: container_name: metrics-esp image: gcr.io/endpoints-release/endpoints-runtime:1 diff --git a/prime-api/src/main/kotlin/org/ostelco/prime/pseudonymizer/PseudonymizerService.kt b/prime-api/src/main/kotlin/org/ostelco/prime/pseudonymizer/PseudonymizerService.kt index 721d50c6f..f48b214c7 100644 --- a/prime-api/src/main/kotlin/org/ostelco/prime/pseudonymizer/PseudonymizerService.kt +++ b/prime-api/src/main/kotlin/org/ostelco/prime/pseudonymizer/PseudonymizerService.kt @@ -1,7 +1,11 @@ package org.ostelco.prime.pseudonymizer import org.ostelco.prime.model.ActivePseudonyms +import org.ostelco.prime.model.PseudonymEntity interface PseudonymizerService { + fun getActivePseudonymsForMsisdn(msisdn: String): ActivePseudonyms + + fun getPseudonymEntityFor(msisdn: String, timestamp: Long): PseudonymEntity } \ No newline at end of file diff --git a/prime/build.gradle b/prime/build.gradle index dd7b71803..c295eb5a3 100644 --- a/prime/build.gradle +++ b/prime/build.gradle @@ -18,7 +18,7 @@ sourceSets { } } -version = "1.12.0" +version = "1.13.0" repositories { maven { diff --git a/prime/config/test.yaml b/prime/config/test.yaml index 68e56c2d9..11def9a99 100644 --- a/prime/config/test.yaml +++ b/prime/config/test.yaml @@ -18,6 +18,8 @@ modules: config: lowBalanceThreshold: 0 - type: pseudonymizer + config: + datastoreType: emulator - type: api config: authenticationCachePolicy: maximumSize=10000, expireAfterAccess=10m diff --git a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/PseudonymModule.kt b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/PseudonymModule.kt index 762e270c7..2a79b6e80 100644 --- a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/PseudonymModule.kt +++ b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/PseudonymModule.kt @@ -17,7 +17,7 @@ class PseudonymModule : PrimeModule { } override fun init(env: Environment) { - PseudonymizerServiceSingleton.init() + PseudonymizerServiceSingleton.init(env = env) env.jersey().register(PseudonymResource()) } } diff --git a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/resources/PseudonymResource.kt b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/resources/PseudonymResource.kt index b32be3f0c..054bdabf2 100644 --- a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/resources/PseudonymResource.kt +++ b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/resources/PseudonymResource.kt @@ -2,8 +2,6 @@ package org.ostelco.pseudonym.resources import org.hibernate.validator.constraints.NotBlank import org.ostelco.pseudonym.service.PseudonymizerServiceSingleton -import org.ostelco.pseudonym.service.PseudonymizerServiceSingleton.getExportTask -import org.ostelco.pseudonym.service.PseudonymizerServiceSingleton.getPseudonymEntityFor import org.slf4j.LoggerFactory import java.time.Instant import javax.ws.rs.DELETE @@ -53,7 +51,7 @@ class PseudonymResource { fun getPseudonym(@NotBlank @PathParam("msisdn") msisdn: String): Response { val timestamp = Instant.now().toEpochMilli() logger.info("GET pseudonym for Msisdn = $msisdn at current time, timestamp = $timestamp") - val entity = getPseudonymEntityFor(msisdn, timestamp) + val entity = PseudonymizerServiceSingleton.getPseudonymEntityFor(msisdn, timestamp) return Response.ok(entity, MediaType.APPLICATION_JSON).build() } @@ -119,7 +117,7 @@ class PseudonymResource { @Path("/exportstatus/{exportId}") fun getExportStatus(@NotBlank @PathParam("exportId") exportId: String): Response { logger.info("GET status of export $exportId") - return getExportTask(exportId) + return PseudonymizerServiceSingleton.getExportTask(exportId) ?.let { Response.ok(it, MediaType.APPLICATION_JSON).build() } ?: Response.status(Status.NOT_FOUND).build() } diff --git a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/managed/PseudonymExport.kt b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt similarity index 99% rename from pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/managed/PseudonymExport.kt rename to pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt index 856464e80..e94c9872d 100644 --- a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/managed/PseudonymExport.kt +++ b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt @@ -1,4 +1,4 @@ -package org.ostelco.pseudonym.managed +package org.ostelco.pseudonym.service import com.google.cloud.bigquery.BigQuery import com.google.cloud.bigquery.Field diff --git a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymizerServiceSingleton.kt b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymizerServiceSingleton.kt index 6533c1b16..44509bb39 100644 --- a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymizerServiceSingleton.kt +++ b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymizerServiceSingleton.kt @@ -1,5 +1,6 @@ package org.ostelco.pseudonym.service +import com.codahale.metrics.health.HealthCheck import com.google.cloud.bigquery.BigQuery import com.google.cloud.bigquery.BigQueryOptions import com.google.cloud.datastore.Datastore @@ -9,7 +10,10 @@ import com.google.cloud.datastore.Key import com.google.cloud.datastore.Query import com.google.cloud.datastore.StructuredQuery.PropertyFilter import com.google.cloud.datastore.testing.LocalDatastoreHelper -import org.hibernate.validator.constraints.NotBlank +import com.google.cloud.http.HttpTransportOptions +import com.google.common.cache.Cache +import com.google.common.cache.CacheBuilder +import io.dropwizard.setup.Environment import org.ostelco.prime.logger import org.ostelco.prime.model.ActivePseudonyms import org.ostelco.prime.model.PseudonymEntity @@ -17,11 +21,9 @@ import org.ostelco.prime.pseudonymizer.PseudonymizerService import org.ostelco.pseudonym.ConfigRegistry import org.ostelco.pseudonym.ExportTaskKind import org.ostelco.pseudonym.PseudonymEntityKind -import org.ostelco.pseudonym.PseudonymServerConfig import org.ostelco.pseudonym.endPropertyName import org.ostelco.pseudonym.errorPropertyName import org.ostelco.pseudonym.exportIdPropertyName -import org.ostelco.pseudonym.managed.PseudonymExport import org.ostelco.pseudonym.msisdnPropertyName import org.ostelco.pseudonym.pseudonymPropertyName import org.ostelco.pseudonym.resources.ExportTask @@ -62,21 +64,24 @@ object PseudonymizerServiceSingleton : PseudonymizerService { private val logger by logger() private lateinit var datastore: Datastore - private lateinit var bigquery: BigQuery + private var bigQuery: BigQuery? = null private val dateBounds: DateBounds = WeeklyBounds() private val executor = Executors.newFixedThreadPool(3) - fun init(bq: BigQuery? = null) { - datastore = getDatastore(ConfigRegistry.config) - if (bq != null) { - bigquery = bq + val pseudonymCache: Cache = CacheBuilder.newBuilder() + .maximumSize(5000) + .build() + + fun init(env: Environment?, bq: BigQuery? = null) { + + initDatastore(env) + + bigQuery = bq ?: if (System.getenv("LOCAL_TESTING") != "true") { + BigQueryOptions.getDefaultInstance().service } else { - if (System.getenv("LOCAL_TESTING") != "true") { - bigquery = BigQueryOptions.getDefaultInstance().service - } else { - logger.info("Local testing, BigQuery is not available...") - } + logger.info("Local testing, BigQuery is not available...") + null } } @@ -89,6 +94,14 @@ object PseudonymizerServiceSingleton : PseudonymizerService { return ActivePseudonyms(current, next) } + override fun getPseudonymEntityFor(msisdn: String, timestamp: Long): PseudonymEntity { + val (bounds, keyPrefix) = dateBounds.getBoundsNKeyPrefix(msisdn, timestamp) + // Retrieves the element from cache. + return pseudonymCache.get(keyPrefix) { + getPseudonymEntity(keyPrefix) ?: createPseudonym(msisdn, bounds, keyPrefix) + } + } + fun findPseudonym(pseudonym: String): PseudonymEntity? { val query = Query.newEntityQueryBuilder() .setKind(PseudonymEntityKind) @@ -121,37 +134,60 @@ object PseudonymizerServiceSingleton : PseudonymizerService { } fun exportPseudonyms(exportId: String) { - logger.info("GET export all pseudonyms to the table $exportId") - val exporter = PseudonymExport(exportId, bigquery, datastore) - executor.execute(exporter.getRunnable()) + bigQuery?.apply { + logger.info("GET export all pseudonyms to the table $exportId") + val exporter = PseudonymExport(exportId = exportId, bigquery = this, datastore = datastore) + executor.execute(exporter.getRunnable()) + } } // Integration testing helper for Datastore. - private fun getDatastore(config: PseudonymServerConfig): Datastore { - val datastore: Datastore? - if (config.datastoreType == "inmemory-emulator") { - logger.info("Starting with in-memory datastore emulator...") - val helper: LocalDatastoreHelper = LocalDatastoreHelper.create(1.0) - helper.start() - datastore = helper.options.service - } else { - datastore = DatastoreOptions.getDefaultInstance().service - logger.info("Created default instance of datastore client") - - // TODO vihang: make this part of health-check - val testKey = datastore.newKeyFactory().setKind("TestKind").newKey("testKey") - val testPropertyKey = "testPropertyKey" - val testPropertyValue = "testPropertyValue" - val testEntity = Entity.newBuilder(testKey).set(testPropertyKey, testPropertyValue).build() - datastore.put(testEntity) - val value = datastore.get(testKey).getString(testPropertyKey) - if (testPropertyValue != value) { - logger.warn("Unable to fetch test property value from datastore") + private fun initDatastore(env: Environment?) { + datastore = when (ConfigRegistry.config.datastoreType) { + "inmemory-emulator" -> { + logger.info("Starting with in-memory datastore emulator") + val helper: LocalDatastoreHelper = LocalDatastoreHelper.create(1.0) + helper.start() + helper.options } - datastore.delete(testKey) - // END - } - return datastore + "emulator" -> { + // When prime running in GCP by hosted CI/CD, Datastore client library assumes it is running in + // production and ignore our instruction to connect to the datastore emulator. So, we are explicitly + // connecting to emulator + logger.info("Connecting to datastore emulator") + DatastoreOptions + .newBuilder() + .setHost("localhost:9090") + .setTransportOptions(HttpTransportOptions.newBuilder().build()) + .build() + } + else -> { + logger.info("Created default instance of datastore client") + DatastoreOptions.getDefaultInstance() + } + }.service + + // health-check for datastore + env?.healthChecks()?.register("datastore", object : HealthCheck() { + override fun check(): Result { + try { + val testKey = datastore.newKeyFactory().setKind("TestKind").newKey("testKey") + val testPropertyKey = "testPropertyKey" + val testPropertyValue = "testPropertyValue" + val testEntity = Entity.newBuilder(testKey).set(testPropertyKey, testPropertyValue).build() + datastore.put(testEntity) + val value = datastore.get(testKey).getString(testPropertyKey) + datastore.delete(testKey) + if (testPropertyValue != value) { + logger.warn("Unable to fetch test property value from datastore") + return Result.builder().unhealthy().build() + } + return Result.builder().healthy().build() + } catch (e: Exception) { + return Result.builder().unhealthy(e).build() + } + } + }) } fun getExportTask(exportId: String): ExportTask? { @@ -181,15 +217,6 @@ object PseudonymizerServiceSingleton : PseudonymizerService { return null } - fun getPseudonymEntityFor(@NotBlank msisdn: String, timestamp: Long): PseudonymEntity { - val (bounds, keyPrefix) = dateBounds.getBoundsNKeyPrefix(msisdn, timestamp) - var entity = getPseudonymEntity(keyPrefix) - if (entity == null) { - entity = createPseudonym(msisdn, bounds, keyPrefix) - } - return entity - } - private fun createPseudonym(msisdn: String, bounds: Bounds, keyPrefix: String): PseudonymEntity { val uuid = UUID.randomUUID().toString() var entity = PseudonymEntity(msisdn, uuid, bounds.start, bounds.end) diff --git a/pseudonym-server/src/test/kotlin/org/ostelco/pseudonym/PseudonymResourceTest.kt b/pseudonym-server/src/test/kotlin/org/ostelco/pseudonym/PseudonymResourceTest.kt index a217f19ff..7c9d6ce98 100644 --- a/pseudonym-server/src/test/kotlin/org/ostelco/pseudonym/PseudonymResourceTest.kt +++ b/pseudonym-server/src/test/kotlin/org/ostelco/pseudonym/PseudonymResourceTest.kt @@ -34,7 +34,7 @@ class PseudonymResourceTest { init { ConfigRegistry.config = PseudonymServerConfig() .apply { this.datastoreType = "inmemory-emulator" } - PseudonymizerServiceSingleton.init(mock(BigQuery::class.java)) + PseudonymizerServiceSingleton.init(env = null, bq = mock(BigQuery::class.java)) } @ClassRule