Skip to content

Commit

Permalink
Improvement: Pseudonymized msisdn before pushing to PubSub.
Browse files Browse the repository at this point in the history
Fix: Connect to datastore emulator when running in GCP via CI/CD.
  • Loading branch information
vihangpatil committed Aug 31, 2018
1 parent 5e36d9f commit 89aaa26
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import io.dropwizard.lifecycle.Managed
import org.ostelco.analytics.api.DataTrafficInfo
import org.ostelco.prime.analytics.ConfigRegistry.config
import org.ostelco.prime.logger
import org.ostelco.prime.module.getResource
import org.ostelco.prime.pseudonymizer.PseudonymizerService
import java.io.IOException
import java.time.Instant

Expand All @@ -21,6 +23,8 @@ object DataConsumptionInfoPublisher : Managed {

private val logger by logger()

private val pseudonymizerService by lazy { getResource<PseudonymizerService>() }

private lateinit var publisher: Publisher

@Throws(IOException::class)
Expand All @@ -40,11 +44,18 @@ object DataConsumptionInfoPublisher : Managed {

fun publish(msisdn: String, usedBucketBytes: Long, bundleBytes: Long) {

if (usedBucketBytes == 0L) {
return
}

val now = Instant.now().toEpochMilli()
val pseudonym = pseudonymizerService.getPseudonymEntityFor(msisdn, now).pseudonym

val data = DataTrafficInfo.newBuilder()
.setMsisdn(msisdn)
.setMsisdn(pseudonym)
.setBucketBytes(usedBucketBytes)
.setBundleBytes(bundleBytes)
.setTimestamp(Timestamps.fromMillis(Instant.now().toEpochMilli()))
.setTimestamp(Timestamps.fromMillis(now))
.build()
.toByteString()

Expand Down
1 change: 0 additions & 1 deletion docker-compose.override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ services:
ipv4_address: 172.16.238.4
default:


metrics-esp:
container_name: metrics-esp
image: gcr.io/endpoints-release/endpoints-runtime:1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.ostelco.prime.pseudonymizer

import org.ostelco.prime.model.ActivePseudonyms
import org.ostelco.prime.model.PseudonymEntity

interface PseudonymizerService {

fun getActivePseudonymsForMsisdn(msisdn: String): ActivePseudonyms

fun getPseudonymEntityFor(msisdn: String, timestamp: Long): PseudonymEntity
}
2 changes: 1 addition & 1 deletion prime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ sourceSets {
}
}

version = "1.12.0"
version = "1.13.0"

repositories {
maven {
Expand Down
2 changes: 2 additions & 0 deletions prime/config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ modules:
config:
lowBalanceThreshold: 0
- type: pseudonymizer
config:
datastoreType: emulator
- type: api
config:
authenticationCachePolicy: maximumSize=10000, expireAfterAccess=10m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PseudonymModule : PrimeModule {
}

override fun init(env: Environment) {
PseudonymizerServiceSingleton.init()
PseudonymizerServiceSingleton.init(env = env)
env.jersey().register(PseudonymResource())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package org.ostelco.pseudonym.resources

import org.hibernate.validator.constraints.NotBlank
import org.ostelco.pseudonym.service.PseudonymizerServiceSingleton
import org.ostelco.pseudonym.service.PseudonymizerServiceSingleton.getExportTask
import org.ostelco.pseudonym.service.PseudonymizerServiceSingleton.getPseudonymEntityFor
import org.slf4j.LoggerFactory
import java.time.Instant
import javax.ws.rs.DELETE
Expand Down Expand Up @@ -53,7 +51,7 @@ class PseudonymResource {
fun getPseudonym(@NotBlank @PathParam("msisdn") msisdn: String): Response {
val timestamp = Instant.now().toEpochMilli()
logger.info("GET pseudonym for Msisdn = $msisdn at current time, timestamp = $timestamp")
val entity = getPseudonymEntityFor(msisdn, timestamp)
val entity = PseudonymizerServiceSingleton.getPseudonymEntityFor(msisdn, timestamp)
return Response.ok(entity, MediaType.APPLICATION_JSON).build()
}

Expand Down Expand Up @@ -119,7 +117,7 @@ class PseudonymResource {
@Path("/exportstatus/{exportId}")
fun getExportStatus(@NotBlank @PathParam("exportId") exportId: String): Response {
logger.info("GET status of export $exportId")
return getExportTask(exportId)
return PseudonymizerServiceSingleton.getExportTask(exportId)
?.let { Response.ok(it, MediaType.APPLICATION_JSON).build() }
?: Response.status(Status.NOT_FOUND).build()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.ostelco.pseudonym.managed
package org.ostelco.pseudonym.service

import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.Field
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.ostelco.pseudonym.service

import com.codahale.metrics.health.HealthCheck
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.datastore.Datastore
Expand All @@ -9,19 +10,20 @@ import com.google.cloud.datastore.Key
import com.google.cloud.datastore.Query
import com.google.cloud.datastore.StructuredQuery.PropertyFilter
import com.google.cloud.datastore.testing.LocalDatastoreHelper
import org.hibernate.validator.constraints.NotBlank
import com.google.cloud.http.HttpTransportOptions
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import io.dropwizard.setup.Environment
import org.ostelco.prime.logger
import org.ostelco.prime.model.ActivePseudonyms
import org.ostelco.prime.model.PseudonymEntity
import org.ostelco.prime.pseudonymizer.PseudonymizerService
import org.ostelco.pseudonym.ConfigRegistry
import org.ostelco.pseudonym.ExportTaskKind
import org.ostelco.pseudonym.PseudonymEntityKind
import org.ostelco.pseudonym.PseudonymServerConfig
import org.ostelco.pseudonym.endPropertyName
import org.ostelco.pseudonym.errorPropertyName
import org.ostelco.pseudonym.exportIdPropertyName
import org.ostelco.pseudonym.managed.PseudonymExport
import org.ostelco.pseudonym.msisdnPropertyName
import org.ostelco.pseudonym.pseudonymPropertyName
import org.ostelco.pseudonym.resources.ExportTask
Expand Down Expand Up @@ -62,21 +64,24 @@ object PseudonymizerServiceSingleton : PseudonymizerService {
private val logger by logger()

private lateinit var datastore: Datastore
private lateinit var bigquery: BigQuery
private var bigQuery: BigQuery? = null
private val dateBounds: DateBounds = WeeklyBounds()

private val executor = Executors.newFixedThreadPool(3)

fun init(bq: BigQuery? = null) {
datastore = getDatastore(ConfigRegistry.config)
if (bq != null) {
bigquery = bq
val pseudonymCache: Cache<String, PseudonymEntity> = CacheBuilder.newBuilder()
.maximumSize(5000)
.build()

fun init(env: Environment?, bq: BigQuery? = null) {

initDatastore(env)

bigQuery = bq ?: if (System.getenv("LOCAL_TESTING") != "true") {
BigQueryOptions.getDefaultInstance().service
} else {
if (System.getenv("LOCAL_TESTING") != "true") {
bigquery = BigQueryOptions.getDefaultInstance().service
} else {
logger.info("Local testing, BigQuery is not available...")
}
logger.info("Local testing, BigQuery is not available...")
null
}
}

Expand All @@ -89,6 +94,14 @@ object PseudonymizerServiceSingleton : PseudonymizerService {
return ActivePseudonyms(current, next)
}

override fun getPseudonymEntityFor(msisdn: String, timestamp: Long): PseudonymEntity {
val (bounds, keyPrefix) = dateBounds.getBoundsNKeyPrefix(msisdn, timestamp)
// Retrieves the element from cache.
return pseudonymCache.get(keyPrefix) {
getPseudonymEntity(keyPrefix) ?: createPseudonym(msisdn, bounds, keyPrefix)
}
}

fun findPseudonym(pseudonym: String): PseudonymEntity? {
val query = Query.newEntityQueryBuilder()
.setKind(PseudonymEntityKind)
Expand Down Expand Up @@ -121,37 +134,60 @@ object PseudonymizerServiceSingleton : PseudonymizerService {
}

fun exportPseudonyms(exportId: String) {
logger.info("GET export all pseudonyms to the table $exportId")
val exporter = PseudonymExport(exportId, bigquery, datastore)
executor.execute(exporter.getRunnable())
bigQuery?.apply {
logger.info("GET export all pseudonyms to the table $exportId")
val exporter = PseudonymExport(exportId = exportId, bigquery = this, datastore = datastore)
executor.execute(exporter.getRunnable())
}
}

// Integration testing helper for Datastore.
private fun getDatastore(config: PseudonymServerConfig): Datastore {
val datastore: Datastore?
if (config.datastoreType == "inmemory-emulator") {
logger.info("Starting with in-memory datastore emulator...")
val helper: LocalDatastoreHelper = LocalDatastoreHelper.create(1.0)
helper.start()
datastore = helper.options.service
} else {
datastore = DatastoreOptions.getDefaultInstance().service
logger.info("Created default instance of datastore client")

// TODO vihang: make this part of health-check
val testKey = datastore.newKeyFactory().setKind("TestKind").newKey("testKey")
val testPropertyKey = "testPropertyKey"
val testPropertyValue = "testPropertyValue"
val testEntity = Entity.newBuilder(testKey).set(testPropertyKey, testPropertyValue).build()
datastore.put(testEntity)
val value = datastore.get(testKey).getString(testPropertyKey)
if (testPropertyValue != value) {
logger.warn("Unable to fetch test property value from datastore")
private fun initDatastore(env: Environment?) {
datastore = when (ConfigRegistry.config.datastoreType) {
"inmemory-emulator" -> {
logger.info("Starting with in-memory datastore emulator")
val helper: LocalDatastoreHelper = LocalDatastoreHelper.create(1.0)
helper.start()
helper.options
}
datastore.delete(testKey)
// END
}
return datastore
"emulator" -> {
// When prime running in GCP by hosted CI/CD, Datastore client library assumes it is running in
// production and ignore our instruction to connect to the datastore emulator. So, we are explicitly
// connecting to emulator
logger.info("Connecting to datastore emulator")
DatastoreOptions
.newBuilder()
.setHost("localhost:9090")
.setTransportOptions(HttpTransportOptions.newBuilder().build())
.build()
}
else -> {
logger.info("Created default instance of datastore client")
DatastoreOptions.getDefaultInstance()
}
}.service

// health-check for datastore
env?.healthChecks()?.register("datastore", object : HealthCheck() {
override fun check(): Result {
try {
val testKey = datastore.newKeyFactory().setKind("TestKind").newKey("testKey")
val testPropertyKey = "testPropertyKey"
val testPropertyValue = "testPropertyValue"
val testEntity = Entity.newBuilder(testKey).set(testPropertyKey, testPropertyValue).build()
datastore.put(testEntity)
val value = datastore.get(testKey).getString(testPropertyKey)
datastore.delete(testKey)
if (testPropertyValue != value) {
logger.warn("Unable to fetch test property value from datastore")
return Result.builder().unhealthy().build()
}
return Result.builder().healthy().build()
} catch (e: Exception) {
return Result.builder().unhealthy(e).build()
}
}
})
}

fun getExportTask(exportId: String): ExportTask? {
Expand Down Expand Up @@ -181,15 +217,6 @@ object PseudonymizerServiceSingleton : PseudonymizerService {
return null
}

fun getPseudonymEntityFor(@NotBlank msisdn: String, timestamp: Long): PseudonymEntity {
val (bounds, keyPrefix) = dateBounds.getBoundsNKeyPrefix(msisdn, timestamp)
var entity = getPseudonymEntity(keyPrefix)
if (entity == null) {
entity = createPseudonym(msisdn, bounds, keyPrefix)
}
return entity
}

private fun createPseudonym(msisdn: String, bounds: Bounds, keyPrefix: String): PseudonymEntity {
val uuid = UUID.randomUUID().toString()
var entity = PseudonymEntity(msisdn, uuid, bounds.start, bounds.end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PseudonymResourceTest {
init {
ConfigRegistry.config = PseudonymServerConfig()
.apply { this.datastoreType = "inmemory-emulator" }
PseudonymizerServiceSingleton.init(mock(BigQuery::class.java))
PseudonymizerServiceSingleton.init(env = null, bq = mock(BigQuery::class.java))
}

@ClassRule
Expand Down

0 comments on commit 89aaa26

Please sign in to comment.