Skip to content

Commit

Permalink
Merge pull request #320 from ostelco/feature/export-more-types
Browse files Browse the repository at this point in the history
Export Subscriber to MSISDN mappings
  • Loading branch information
prasanthu authored Sep 25, 2018
2 parents 2d1789d + b842763 commit b3132f8
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 39 deletions.
19 changes: 19 additions & 0 deletions exporter/script/delete_export_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,36 @@ projectId=pantel-2decb

msisdnPseudonymsTable=exported_pseudonyms.${exportId}_msisdn
subscriberPseudonymsTable=exported_pseudonyms.${exportId}_subscriber
sub2msisdnMappingsTable=exported_data_consumption.${exportId}_sub2msisdn
dataConsumptionTable=exported_data_consumption.$exportId
purchaseRecordsTable=exported_data_consumption.${exportId}_purchases
csvfile=$projectId-dataconsumption-export/$exportId.csv
purchasesCsvfile=$projectId-dataconsumption-export/$exportId-purchases.csv
sub2msisdnCsvfile=$projectId-dataconsumption-export/$exportId-sub2msisdn.csv

echo "Cleaning all data for export $exportId"
echo "Deleting Table $msisdnPseudonymsTable"
bq rm -f -t $msisdnPseudonymsTable

echo "Deleting Table $subscriberPseudonymsTable"
bq rm -f -t $subscriberPseudonymsTable

echo "Deleting Table $sub2msisdnMappingsTable"
bq rm -f -t $sub2msisdnMappingsTable

echo "Deleting Table $dataConsumptionTable"
bq rm -f -t $dataConsumptionTable

echo "Deleting Table $purchaseRecordsTable"
bq rm -f -t $purchaseRecordsTable

echo "Deleting csv gs://$csvfile"
gsutil rm gs://$csvfile

echo "Deleting csv gs://$purchasesCsvfile"
gsutil rm gs://$purchasesCsvfile

echo "Deleting csv gs://$sub2msisdnCsvfile"
gsutil rm gs://$sub2msisdnCsvfile

echo "Finished cleanup for the export $exportId"
31 changes: 30 additions & 1 deletion exporter/script/export_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ projectId=pantel-2decb

msisdnPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_msisdn
subscriberPseudonymsTable=$projectId.exported_pseudonyms.${exportId}_subscriber
sub2msisdnMappingsTable=exported_data_consumption.${exportId}_sub2msisdn
hourlyConsumptionTable=$projectId.data_consumption.hourly_consumption
dataConsumptionTable=exported_data_consumption.$exportId
rawPurchasesTable=$projectId.purchases.raw_purchases
purchaseRecordsTable=exported_data_consumption.${exportId}_purchases
csvfile=$projectId-dataconsumption-export/$exportId.csv
purchasesCsvfile=$projectId-dataconsumption-export/$exportId-purchases.csv
sub2msisdnCsvfile=$projectId-dataconsumption-export/$exportId-sub2msisdn.csv

# Generate the pseudonym tables for this export
echo "Starting export job for $exportId"
Expand Down Expand Up @@ -65,4 +70,28 @@ echo "Created table $dataConsumptionTable"

echo "Exporting data to csv $csvfile"
bq --location=EU extract --destination_format=CSV $dataConsumptionTable gs://$csvfile
echo "Exported data to gs://$csvfile"
echo "Exported data to gs://$csvfile"

echo "Creating table $purchaseRecordsTable"
# SQL for joining subscriber pseudonym & purchase record tables.
read -r -d '' sqlForJoin2 << EOM
SELECT
TIMESTAMP_MILLIS(pr.timestamp) as timestamp , ps.pseudoid as subscriberId, pr.product.sku, pr.product.price.amount, product.price.currency
FROM
\`$rawPurchasesTable\` as pr
JOIN
\`$subscriberPseudonymsTable\` as ps
ON ps.pseudonym = pr.subscriberId
EOM

# Run the query using bq & dump results to the new table
bq --location=EU --format=none query --destination_table $purchaseRecordsTable --replace --use_legacy_sql=false $sqlForJoin2
echo "Created table $purchaseRecordsTable"

echo "Exporting data to csv $purchasesCsvfile"
bq --location=EU extract --destination_format=CSV $purchaseRecordsTable gs://$purchasesCsvfile
echo "Exported data to gs://$purchasesCsvfile"

echo "Exporting data to csv $sub2msisdnCsvfile"
bq --location=EU extract --destination_format=CSV $sub2msisdnMappingsTable gs://$sub2msisdnCsvfile
echo "Exported data to gs://$sub2msisdnCsvfile"
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@ import com.google.cloud.datastore.StructuredQuery
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import org.apache.commons.codec.binary.Hex
import org.ostelco.prime.model.Subscriber
import org.ostelco.prime.model.Subscription
import org.ostelco.prime.module.getResource
import org.ostelco.prime.storage.AdminDataSource
import org.ostelco.pseudonym.*
import org.slf4j.LoggerFactory
import java.util.*
import java.net.URLEncoder
import java.security.MessageDigest
import java.util.UUID

private const val datasetName = "exported_pseudonyms"
private const val consumptionDatasetName = "exported_data_consumption"

private const val idFieldName = "pseudoid"
private const val msisdnIdPropertyName = "msisdnId"

/**
* Exports pseudonym objects to a bigquery Table
Expand All @@ -40,7 +48,7 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue

private var status = Status.INITIAL
private var error: String = ""
private val randomKey = "$exportId-${UUID.randomUUID().toString()}"
private val randomKey = "$exportId-${UUID.randomUUID()}"
private val msisdnExporter: DS2BQExporter = DS2BQExporter(
tableName = tableName("msisdn"),
sourceEntity = MsisdnPseudonymEntityKind,
Expand All @@ -57,6 +65,12 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue
randomKey = randomKey,
datastore = datastore,
bigquery = bigquery)
private val msisdnMappingExporter: SubscriberMsisdnMappingExporter = SubscriberMsisdnMappingExporter(
tableName = tableName("sub2msisdn"),
msisdnExporter = msisdnExporter,
subscriberIdExporter = subscriberIdExporter,
datasetName = consumptionDatasetName,
bigquery = bigquery)

init {
upsertTaskStatus()
Expand All @@ -70,6 +84,7 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue
upsertTaskStatus()
msisdnExporter.doExport()
subscriberIdExporter.doExport()
msisdnMappingExporter.doExport()
if (status == Status.RUNNING) {
status = Status.FINISHED
upsertTaskStatus()
Expand Down Expand Up @@ -114,48 +129,35 @@ class PseudonymExport(private val exportId: String, private val bigquery: BigQue
}
}


}

/**
* Class for exporting Datastore tables to BigQuery.
*/
class DS2BQExporter(
private val tableName: String,
tableName: String,
private val sourceEntity: String,
private val sourceField: String,
private val datasetName: String,
datasetName: String,
private val randomKey: String,
private val datastore: Datastore,
private val bigquery: BigQuery) {

private val logger = LoggerFactory.getLogger(DS2BQExporter::class.java)
private val digest = MessageDigest.getInstance("SHA-256")
bigquery: BigQuery): BQExporter(tableName, randomKey, datasetName, bigquery) {

override val logger = LoggerFactory.getLogger(DS2BQExporter::class.java)
private val idCache: Cache<String, String> = CacheBuilder.newBuilder()
.maximumSize(5000)
.build()
private var error: String = ""
private var totalRows = 0
private val digest = MessageDigest.getInstance("SHA-256")

private fun createTable(): Table {
// Delete existing table
val deleted = bigquery.delete(datasetName, tableName)
if (deleted) {
logger.info("Existing table '$tableName' deleted.")
}
val tableId = TableId.of(datasetName, tableName)
// Table field definition
override fun getSchema(): Schema {
val id = Field.of(idFieldName, LegacySQLTypeName.STRING)
val pseudonym = Field.of(pseudonymPropertyName, LegacySQLTypeName.STRING)
val source = Field.of(sourceField, LegacySQLTypeName.STRING)
// Table schema definition
val schema = Schema.of(id, pseudonym, source)
val tableDefinition = StandardTableDefinition.of(schema)
val tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build()
return bigquery.create(tableInfo)
return Schema.of(id, pseudonym, source)
}

private fun getIdForKey(key: String): String {
fun getIdForKey(key: String): String {
// Retrieves the element from cache.
// Incase of cache miss, generate a new SHA
return idCache.get(key) {
Expand All @@ -165,7 +167,7 @@ class DS2BQExporter(
}
}

private fun exportPage(pageSize: Int, cursor: Cursor?, table: Table): Cursor? {
fun exportPage(pageSize: Int, cursor: Cursor?, table: Table): Cursor? {
// Dump pseudonyms to BQ, one page at a time. Since all records in a
// page are inserted at once, use a small page size
val queryBuilder = Query.newEntityQueryBuilder()
Expand All @@ -177,10 +179,8 @@ class DS2BQExporter(
}
val rows = ArrayList<RowToInsert>()
val pseudonyms = datastore.run(queryBuilder.build())
var pseudonymsInPage = 0
while (pseudonyms.hasNext()) {
val entity = pseudonyms.next()
pseudonymsInPage++
totalRows++
val row = hashMapOf(
sourceField to entity.getString(sourceField),
Expand All @@ -189,27 +189,19 @@ class DS2BQExporter(
val rowId = "rowId$totalRows"
rows.add(RowToInsert.of(rowId, row))
}
if (pseudonymsInPage != 0) {
logger.info("Inserting ${pseudonymsInPage} to ${tableName}")
val response = table.insert(rows, true, true)
if (response.hasErrors()) {
logger.error("Failed to insert Records to '$tableName'", response.insertErrors)
error = "$error${response.insertErrors}\n"
}
}
return if (pseudonymsInPage < pageSize) {
insertToBq(table, rows)
return if (rows.size < pageSize) {
null
} else {
pseudonyms.cursorAfter
}

}

/**
* Export the Datastore table to BQ.
* This is done in pages of 100 records.
*/
fun doExport() {
override fun doExport() {
logger.info("Starting export to ${tableName}")
val table = createTable()
var cursor: Cursor? = null
Expand All @@ -218,5 +210,101 @@ class DS2BQExporter(
} while (cursor != null)
logger.info("Exported ${totalRows} rows to ${tableName}")
}
}


/**
* Class for exporting Subscriber -> Msisidn mapping.
*/
class SubscriberMsisdnMappingExporter(
tableName: String,
private val msisdnExporter: DS2BQExporter,
private val subscriberIdExporter: DS2BQExporter,
datasetName: String,
bigquery: BigQuery) :
BQExporter(tableName, "", datasetName, bigquery) {

private val storage by lazy { getResource<AdminDataSource>() }
override val logger = LoggerFactory.getLogger(SubscriberMsisdnMappingExporter::class.java)

override fun getSchema(): Schema {
val subscriberId = Field.of(subscriberIdPropertyName, LegacySQLTypeName.STRING)
val msisdnId = Field.of(msisdnIdPropertyName, LegacySQLTypeName.STRING)
return Schema.of(subscriberId, msisdnId)
}

private fun exportAllPages(table: Table, pageSize: Int) {
// Dump pseudonyms to BQ, one page at a time. Since all records in a
// page are inserted at once, use a small page size
val map: Map<Subscriber, Subscription> = storage.getSubscriberToMsisdnMap()
var rows = ArrayList<RowToInsert>()
for ((subscriber, subscription) in map) {
val encodedSubscriberId = URLEncoder.encode(subscriber.email, "UTF-8")
totalRows++
val row = hashMapOf(
msisdnIdPropertyName to msisdnExporter.getIdForKey(subscription.msisdn),
subscriberIdPropertyName to subscriberIdExporter.getIdForKey(encodedSubscriberId))
val rowId = "rowId$totalRows"
rows.add(RowToInsert.of(rowId, row))
if (rows.size == pageSize) {
// Insert current page to BQ
insertToBq(table, rows)
// Reset rows array.
rows = ArrayList<RowToInsert>()
}
}
// Insert remaining rows to BQ
insertToBq(table, rows)
}

/**
* Export all subscription mapping to BQ.
* This is done in pages of 100 records.
*/
override fun doExport() {
logger.info("Starting export to ${tableName}")
val table = createTable()
exportAllPages(table, 100)
logger.info("Exported ${totalRows} rows to ${tableName}")
}
}

/**
* Class for exporting Subscriber -> Msisidn mapping.
*/
abstract class BQExporter(
val tableName: String,
private val randomKey: String,
private val datasetName: String,
private val bigquery: BigQuery) {

open val logger = LoggerFactory.getLogger(BQExporter::class.java)
var error: String = ""
var totalRows = 0

fun createTable(): Table {
// Delete existing table
val deleted = bigquery.delete(datasetName, tableName)
if (deleted) {
logger.info("Existing table '$tableName' deleted.")
}
val tableId = TableId.of(datasetName, tableName)
val schema = getSchema()
val tableDefinition = StandardTableDefinition.of(schema)
val tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build()
return bigquery.create(tableInfo)
}

fun insertToBq(table: Table, rows: ArrayList<RowToInsert>) {
if (rows.size != 0) {
val response = table.insert(rows, true, true)
if (response.hasErrors()) {
logger.error("Failed to insert Records to '$tableName'", response.insertErrors)
error = "$error${response.insertErrors}\n"
}
}
}

abstract fun getSchema(): Schema
abstract fun doExport()
}

0 comments on commit b3132f8

Please sign in to comment.