From 3d73007fbda0ca83609fa081ebb46db0d69f995f Mon Sep 17 00:00:00 2001 From: Prasanth Ullattil Date: Mon, 24 Sep 2018 19:44:12 +0200 Subject: [PATCH 1/6] Export Subscriber to MSISDN mappings --- .../pseudonym/service/PseudonymExport.kt | 164 ++++++++++++++---- 1 file changed, 126 insertions(+), 38 deletions(-) diff --git a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt index edfa18e45..8ed1adea1 100644 --- a/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt +++ b/pseudonym-server/src/main/kotlin/org/ostelco/pseudonym/service/PseudonymExport.kt @@ -17,13 +17,21 @@ import com.google.cloud.datastore.StructuredQuery import com.google.common.cache.Cache import com.google.common.cache.CacheBuilder import org.apache.commons.codec.binary.Hex +import org.ostelco.prime.model.Subscriber +import org.ostelco.prime.model.Subscription +import org.ostelco.prime.module.getResource +import org.ostelco.prime.storage.AdminDataSource import org.ostelco.pseudonym.* import org.slf4j.LoggerFactory -import java.util.* +import java.net.URLEncoder import java.security.MessageDigest +import java.util.UUID private const val datasetName = "exported_pseudonyms" +private const val consumptionDatasetName = "exported_data_consumption" + private const val idFieldName = "pseudoid" +private const val msisdnIdPropertyName = "msisdnId" /** * Exports pseudonym objects to a bigquery Table @@ -40,7 +48,7 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue private var status = Status.INITIAL private var error: String = "" - private val randomKey = "$exportId-${UUID.randomUUID().toString()}" + private val randomKey = "$exportId-${UUID.randomUUID()}" private val msisdnExporter: DS2BQExporter = DS2BQExporter( tableName = tableName("msisdn"), sourceEntity = MsisdnPseudonymEntityKind, @@ -57,6 +65,12 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue randomKey = randomKey, datastore = datastore, bigquery = bigquery) + private val msisdnMappingExporter: SubscriberMsisdnMappingExporter = SubscriberMsisdnMappingExporter( + tableName = tableName("sub2msisdn"), + msisdnExporter = msisdnExporter, + subscriberIdExporter = subscriberIdExporter, + datasetName = consumptionDatasetName, + bigquery = bigquery) init { upsertTaskStatus() @@ -70,6 +84,7 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue upsertTaskStatus() msisdnExporter.doExport() subscriberIdExporter.doExport() + msisdnMappingExporter.doExport() if (status == Status.RUNNING) { status = Status.FINISHED upsertTaskStatus() @@ -114,48 +129,35 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue } } + } /** * Class for exporting Datastore tables to BigQuery. */ class DS2BQExporter( - private val tableName: String, + tableName: String, private val sourceEntity: String, private val sourceField: String, - private val datasetName: String, + datasetName: String, private val randomKey: String, private val datastore: Datastore, - private val bigquery: BigQuery) { - - private val logger = LoggerFactory.getLogger(DS2BQExporter::class.java) - private val digest = MessageDigest.getInstance("SHA-256") + bigquery: BigQuery): BQExporter(tableName, randomKey, datasetName, bigquery) { + override val logger = LoggerFactory.getLogger(DS2BQExporter::class.java) private val idCache: Cache = CacheBuilder.newBuilder() .maximumSize(5000) .build() - private var error: String = "" - private var totalRows = 0 + private val digest = MessageDigest.getInstance("SHA-256") - private fun createTable(): Table { - // Delete existing table - val deleted = bigquery.delete(datasetName, tableName) - if (deleted) { - logger.info("Existing table '$tableName' deleted.") - } - val tableId = TableId.of(datasetName, tableName) - // Table field definition + override fun getSchema(): Schema { val id = Field.of(idFieldName, LegacySQLTypeName.STRING) val pseudonym = Field.of(pseudonymPropertyName, LegacySQLTypeName.STRING) val source = Field.of(sourceField, LegacySQLTypeName.STRING) - // Table schema definition - val schema = Schema.of(id, pseudonym, source) - val tableDefinition = StandardTableDefinition.of(schema) - val tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build() - return bigquery.create(tableInfo) + return Schema.of(id, pseudonym, source) } - private fun getIdForKey(key: String): String { + fun getIdForKey(key: String): String { // Retrieves the element from cache. // Incase of cache miss, generate a new SHA return idCache.get(key) { @@ -165,7 +167,7 @@ class DS2BQExporter( } } - private fun exportPage(pageSize: Int, cursor: Cursor?, table: Table): Cursor? { + fun exportPage(pageSize: Int, cursor: Cursor?, table: Table): Cursor? { // Dump pseudonyms to BQ, one page at a time. Since all records in a // page are inserted at once, use a small page size val queryBuilder = Query.newEntityQueryBuilder() @@ -177,10 +179,8 @@ class DS2BQExporter( } val rows = ArrayList() val pseudonyms = datastore.run(queryBuilder.build()) - var pseudonymsInPage = 0 while (pseudonyms.hasNext()) { val entity = pseudonyms.next() - pseudonymsInPage++ totalRows++ val row = hashMapOf( sourceField to entity.getString(sourceField), @@ -189,27 +189,19 @@ class DS2BQExporter( val rowId = "rowId$totalRows" rows.add(RowToInsert.of(rowId, row)) } - if (pseudonymsInPage != 0) { - logger.info("Inserting ${pseudonymsInPage} to ${tableName}") - val response = table.insert(rows, true, true) - if (response.hasErrors()) { - logger.error("Failed to insert Records to '$tableName'", response.insertErrors) - error = "$error${response.insertErrors}\n" - } - } - return if (pseudonymsInPage < pageSize) { + insertToBq(table, rows) + return if (rows.size < pageSize) { null } else { pseudonyms.cursorAfter } - } /** * Export the Datastore table to BQ. * This is done in pages of 100 records. */ - fun doExport() { + override fun doExport() { logger.info("Starting export to ${tableName}") val table = createTable() var cursor: Cursor? = null @@ -218,5 +210,101 @@ class DS2BQExporter( } while (cursor != null) logger.info("Exported ${totalRows} rows to ${tableName}") } +} + + +/** + * Class for exporting Subscriber -> Msisidn mapping. + */ +class SubscriberMsisdnMappingExporter( + tableName: String, + private val msisdnExporter: DS2BQExporter, + private val subscriberIdExporter: DS2BQExporter, + datasetName: String, + bigquery: BigQuery) : + BQExporter(tableName, "", datasetName, bigquery) { + + private val storage by lazy { getResource() } + override val logger = LoggerFactory.getLogger(SubscriberMsisdnMappingExporter::class.java) + + override fun getSchema(): Schema { + val subscriberId = Field.of(subscriberIdPropertyName, LegacySQLTypeName.STRING) + val msisdnId = Field.of(msisdnIdPropertyName, LegacySQLTypeName.STRING) + return Schema.of(subscriberId, msisdnId) + } + + private fun exportAllPages(table: Table, pageSize: Int) { + // Dump pseudonyms to BQ, one page at a time. Since all records in a + // page are inserted at once, use a small page size + val map: Map = storage.getSubscriberToMsisdnMap() + var rows = ArrayList() + for ((subscriber, subscription) in map) { + val encodedSubscriberId = URLEncoder.encode(subscriber.email, "UTF-8") + totalRows++ + val row = hashMapOf( + msisdnIdPropertyName to msisdnExporter.getIdForKey(subscription.msisdn), + subscriberIdPropertyName to subscriberIdExporter.getIdForKey(encodedSubscriberId)) + val rowId = "rowId$totalRows" + rows.add(RowToInsert.of(rowId, row)) + if (rows.size == pageSize) { + // Insert current page to BQ + insertToBq(table, rows) + // Reset rows array. + rows = ArrayList() + } + } + // Insert remaining rows to BQ + insertToBq(table, rows) + } + + /** + * Export all subscription mapping to BQ. + * This is done in pages of 100 records. + */ + override fun doExport() { + logger.info("Starting export to ${tableName}") + val table = createTable() + exportAllPages(table, 100) + logger.info("Exported ${totalRows} rows to ${tableName}") + } +} + +/** + * Class for exporting Subscriber -> Msisidn mapping. + */ +abstract class BQExporter( + val tableName: String, + private val randomKey: String, + private val datasetName: String, + private val bigquery: BigQuery) { + + open val logger = LoggerFactory.getLogger(BQExporter::class.java) + var error: String = "" + var totalRows = 0 + + fun createTable(): Table { + // Delete existing table + val deleted = bigquery.delete(datasetName, tableName) + if (deleted) { + logger.info("Existing table '$tableName' deleted.") + } + val tableId = TableId.of(datasetName, tableName) + val schema = getSchema() + val tableDefinition = StandardTableDefinition.of(schema) + val tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build() + return bigquery.create(tableInfo) + } + + fun insertToBq(table: Table, rows: ArrayList) { + if (rows.size != 0) { + val response = table.insert(rows, true, true) + if (response.hasErrors()) { + logger.error("Failed to insert Records to '$tableName'", response.insertErrors) + error = "$error${response.insertErrors}\n" + } + } + } + abstract fun getSchema(): Schema + abstract fun doExport() } From bfe97840975633d66dab159c3c0261d3504ca9e1 Mon Sep 17 00:00:00 2001 From: Prasanth Ullattil Date: Tue, 25 Sep 2018 09:54:27 +0200 Subject: [PATCH 2/6] Export purchase records and msisdn-to-subscriber mapping tables --- exporter/script/delete_export_data.sh | 19 +++++++++++++++++ exporter/script/export_data.sh | 30 ++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/exporter/script/delete_export_data.sh b/exporter/script/delete_export_data.sh index d53e2d667..5df587747 100644 --- a/exporter/script/delete_export_data.sh +++ b/exporter/script/delete_export_data.sh @@ -12,17 +12,36 @@ projectId=pantel-2decb msisdnPseudonymsTable=exported_pseudonyms.${exportId}_msisdn subscriberPseudonymsTable=exported_pseudonyms.${exportId}_subscriber +sub2msisdnMappingsTable=$projectId.exported_pseudonyms.${exportId}_sub2msisdn dataConsumptionTable=exported_data_consumption.$exportId +purchaseRecordsTable=exported_data_consumption.${exportId}_purchases csvfile=$projectId-dataconsumption-export/$exportId.csv +purchasesCsvfile=$projectId-dataconsumption-export/$exportId-purchases.csv +sub2msisdnCsvfile=$projectId-dataconsumption-export/$exportId-sub2msisdn.csv echo "Cleaning all data for export $exportId" echo "Deleting Table $msisdnPseudonymsTable" bq rm -f -t $msisdnPseudonymsTable + echo "Deleting Table $subscriberPseudonymsTable" bq rm -f -t $subscriberPseudonymsTable + +echo "Deleting Table $sub2msisdnMappingsTable" +bq rm -f -t $sub2msisdnMappingsTable + echo "Deleting Table $dataConsumptionTable" bq rm -f -t $dataConsumptionTable + +echo "Deleting Table $purchaseRecordsTable" +bq rm -f -t $purchaseRecordsTable + echo "Deleting csv gs://$csvfile" gsutil rm gs://$csvfile +echo "Deleting csv gs://$purchasesCsvfile" +gsutil rm gs://$purchasesCsvfile + +echo "Deleting csv gs://$sub2msisdnCsvfile" +gsutil rm gs://$sub2msisdnCsvfile + echo "Finished cleanup for the export $exportId" \ No newline at end of file diff --git a/exporter/script/export_data.sh b/exporter/script/export_data.sh index 6b2fe1c78..fa27015c3 100644 --- a/exporter/script/export_data.sh +++ b/exporter/script/export_data.sh @@ -11,9 +11,14 @@ projectId=pantel-2decb msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber +sub2msisdnMappingsTable=$projectId.exported_pseudonyms.${exportId}_sub2msisdn hourlyConsumptionTable=$projectId.data_consumption.hourly_consumption dataConsumptionTable=exported_data_consumption.$exportId +rawPurchasesTable=$projectId.purchases.raw_purchases +purchaseRecordsTable=exported_data_consumption.${exportId}_purchases csvfile=$projectId-dataconsumption-export/$exportId.csv +purchasesCsvfile=$projectId-dataconsumption-export/$exportId-purchases.csv +sub2msisdnCsvfile=$projectId-dataconsumption-export/$exportId-sub2msisdn.csv # Generate the pseudonym tables for this export echo "Starting export job for $exportId" @@ -65,4 +70,27 @@ echo "Created table $dataConsumptionTable" echo "Exporting data to csv $csvfile" bq --location=EU extract --destination_format=CSV $dataConsumptionTable gs://$csvfile -echo "Exported data to gs://$csvfile" \ No newline at end of file +echo "Exported data to gs://$csvfile" + +echo "Creating table purchaseRecordsTable" +# SQL for joining subscriber pseudonym & purchase record tables. +read -r -d '' sqlForJoin2 << EOM +SELECT + TIMESTAMP_MILLIS(pr.timestamp) as timestamp , ps.pseudoid as subscriberId, pr.product.sku, pr.product.price.amount, product.price.currency +FROM + `$rawPurchasesTable` as pr +JOIN + `$subscriberPseudonymsTable` as ps +ON ps.pseudonym = pr.subscriberId +EOM +# Run the query using bq & dump results to the new table +bq --location=EU --format=none query --destination_table $purchaseRecordsTable --replace --use_legacy_sql=false $sqlForJoin2 +echo "Created table $purchaseRecordsTable" + +echo "Exporting data to csv $purchasesCsvfile" +bq --location=EU extract --destination_format=CSV $purchaseRecordsTable gs://$purchasesCsvfile +echo "Exported data to gs://$purchasesCsvfile" + +echo "Exporting data to csv $sub2msisdnCsvfile" +bq --location=EU extract --destination_format=CSV $sub2msisdnMappingsTable gs://$sub2msisdnCsvfile +echo "Exported data to gs://$sub2msisdnCsvfile" From 1a6aed9cfc38cf595566e1b3338445dbfb71f5f0 Mon Sep 17 00:00:00 2001 From: Prasanth Ullattil Date: Tue, 25 Sep 2018 10:31:49 +0200 Subject: [PATCH 3/6] Fix the SQL statement for 2nd join --- exporter/script/export_data.sh | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/exporter/script/export_data.sh b/exporter/script/export_data.sh index fa27015c3..96a57c4d2 100644 --- a/exporter/script/export_data.sh +++ b/exporter/script/export_data.sh @@ -72,17 +72,18 @@ echo "Exporting data to csv $csvfile" bq --location=EU extract --destination_format=CSV $dataConsumptionTable gs://$csvfile echo "Exported data to gs://$csvfile" -echo "Creating table purchaseRecordsTable" +echo "Creating table $purchaseRecordsTable" # SQL for joining subscriber pseudonym & purchase record tables. read -r -d '' sqlForJoin2 << EOM SELECT TIMESTAMP_MILLIS(pr.timestamp) as timestamp , ps.pseudoid as subscriberId, pr.product.sku, pr.product.price.amount, product.price.currency FROM - `$rawPurchasesTable` as pr + \`$rawPurchasesTable\` as pr JOIN - `$subscriberPseudonymsTable` as ps + \`$subscriberPseudonymsTable\` as ps ON ps.pseudonym = pr.subscriberId EOM + # Run the query using bq & dump results to the new table bq --location=EU --format=none query --destination_table $purchaseRecordsTable --replace --use_legacy_sql=false $sqlForJoin2 echo "Created table $purchaseRecordsTable" From ce05a98b0c873754284c5585c102a652d41e1c5d Mon Sep 17 00:00:00 2001 From: Prasanth Ullattil Date: Tue, 25 Sep 2018 10:43:59 +0200 Subject: [PATCH 4/6] Add project ID to the table names --- exporter/script/delete_export_data.sh | 10 +++++----- exporter/script/export_data.sh | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/exporter/script/delete_export_data.sh b/exporter/script/delete_export_data.sh index 5df587747..7f8bc3360 100644 --- a/exporter/script/delete_export_data.sh +++ b/exporter/script/delete_export_data.sh @@ -10,11 +10,11 @@ exportId=${exportId//-} exportId=${exportId,,} projectId=pantel-2decb -msisdnPseudonymsTable=exported_pseudonyms.${exportId}_msisdn -subscriberPseudonymsTable=exported_pseudonyms.${exportId}_subscriber -sub2msisdnMappingsTable=$projectId.exported_pseudonyms.${exportId}_sub2msisdn -dataConsumptionTable=exported_data_consumption.$exportId -purchaseRecordsTable=exported_data_consumption.${exportId}_purchases +msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn +subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber +sub2msisdnMappingsTable=$projectId.exported_data_consumption.${exportId}_sub2msisdn +dataConsumptionTable=$projectId.exported_data_consumption.$exportId +purchaseRecordsTable=$projectId.exported_data_consumption.${exportId}_purchases csvfile=$projectId-dataconsumption-export/$exportId.csv purchasesCsvfile=$projectId-dataconsumption-export/$exportId-purchases.csv sub2msisdnCsvfile=$projectId-dataconsumption-export/$exportId-sub2msisdn.csv diff --git a/exporter/script/export_data.sh b/exporter/script/export_data.sh index 96a57c4d2..d1bb90671 100644 --- a/exporter/script/export_data.sh +++ b/exporter/script/export_data.sh @@ -11,9 +11,9 @@ projectId=pantel-2decb msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber -sub2msisdnMappingsTable=$projectId.exported_pseudonyms.${exportId}_sub2msisdn +sub2msisdnMappingsTable=$projectId.exported_data_consumption.${exportId}_sub2msisdn hourlyConsumptionTable=$projectId.data_consumption.hourly_consumption -dataConsumptionTable=exported_data_consumption.$exportId +dataConsumptionTable=$projectId.exported_data_consumption.$exportId rawPurchasesTable=$projectId.purchases.raw_purchases purchaseRecordsTable=exported_data_consumption.${exportId}_purchases csvfile=$projectId-dataconsumption-export/$exportId.csv From ac350828c588ce06c184ceea314072bcb06be4e5 Mon Sep 17 00:00:00 2001 From: Prasanth Ullattil Date: Tue, 25 Sep 2018 10:50:27 +0200 Subject: [PATCH 5/6] Remove unwanted projectid from the name --- exporter/script/delete_export_data.sh | 6 +++--- exporter/script/export_data.sh | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter/script/delete_export_data.sh b/exporter/script/delete_export_data.sh index 7f8bc3360..5de0d5973 100644 --- a/exporter/script/delete_export_data.sh +++ b/exporter/script/delete_export_data.sh @@ -12,9 +12,9 @@ projectId=pantel-2decb msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber -sub2msisdnMappingsTable=$projectId.exported_data_consumption.${exportId}_sub2msisdn -dataConsumptionTable=$projectId.exported_data_consumption.$exportId -purchaseRecordsTable=$projectId.exported_data_consumption.${exportId}_purchases +sub2msisdnMappingsTable=exported_data_consumption.${exportId}_sub2msisdn +dataConsumptionTable=exported_data_consumption.$exportId +purchaseRecordsTable=exported_data_consumption.${exportId}_purchases csvfile=$projectId-dataconsumption-export/$exportId.csv purchasesCsvfile=$projectId-dataconsumption-export/$exportId-purchases.csv sub2msisdnCsvfile=$projectId-dataconsumption-export/$exportId-sub2msisdn.csv diff --git a/exporter/script/export_data.sh b/exporter/script/export_data.sh index d1bb90671..a63afd2e2 100644 --- a/exporter/script/export_data.sh +++ b/exporter/script/export_data.sh @@ -11,9 +11,9 @@ projectId=pantel-2decb msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber -sub2msisdnMappingsTable=$projectId.exported_data_consumption.${exportId}_sub2msisdn +sub2msisdnMappingsTable=exported_data_consumption.${exportId}_sub2msisdn hourlyConsumptionTable=$projectId.data_consumption.hourly_consumption -dataConsumptionTable=$projectId.exported_data_consumption.$exportId +dataConsumptionTable=exported_data_consumption.$exportId rawPurchasesTable=$projectId.purchases.raw_purchases purchaseRecordsTable=exported_data_consumption.${exportId}_purchases csvfile=$projectId-dataconsumption-export/$exportId.csv From b8427639df0f9245216287e7472bac813ed5e35b Mon Sep 17 00:00:00 2001 From: Prasanth Ullattil Date: Tue, 25 Sep 2018 10:55:19 +0200 Subject: [PATCH 6/6] More name corrections --- exporter/script/delete_export_data.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/script/delete_export_data.sh b/exporter/script/delete_export_data.sh index 5de0d5973..2d59e7e2b 100644 --- a/exporter/script/delete_export_data.sh +++ b/exporter/script/delete_export_data.sh @@ -10,8 +10,8 @@ exportId=${exportId//-} exportId=${exportId,,} projectId=pantel-2decb -msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn -subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber +msisdnPseudonymsTable=exported_pseudonyms.${exportId}_msisdn +subscriberPseudonymsTable=exported_pseudonyms.${exportId}_subscriber sub2msisdnMappingsTable=exported_data_consumption.${exportId}_sub2msisdn dataConsumptionTable=exported_data_consumption.$exportId purchaseRecordsTable=exported_data_consumption.${exportId}_purchases