Skip to content

Commit

Permalink
Renamed analytics to dataflow-pipelines.
Browse files Browse the repository at this point in the history
  • Loading branch information
vihangpatil authored Aug 30, 2018
2 parents 19ae6d9 + ca523a7 commit 5e36d9f
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 179 deletions.

This file was deleted.

2 changes: 1 addition & 1 deletion analytics/Dockerfile → dataflow-pipelines/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY script/start.sh /start.sh

COPY config /config

COPY build/libs/analytics-uber.jar /analytics.jar
COPY build/libs/dataflow-pipelines-uber.jar /dataflow-pipelines.jar

EXPOSE 8080
EXPOSE 8081
Expand Down
2 changes: 1 addition & 1 deletion analytics/README.md → dataflow-pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ With unit testing:

## Deploy to GCP

sudo docker-compose up --build
docker-compose up --build
2 changes: 1 addition & 1 deletion analytics/build.gradle → dataflow-pipelines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
}

shadowJar {
mainClassName = 'org.ostelco.analytics.DataConsumptionPipelineKt'
mainClassName = 'org.ostelco.dataflow.pipelines.DeployPipelineKt'
mergeServiceFiles()
classifier = "uber"
version = null
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
version: "3.6"
version: "3.7"

services:
analytics:
container_name: analytics
container_name: dataflow-pipelines
build: .
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/config/pantel-prod.json
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.ostelco.dataflow.pipelines

import ch.qos.logback.classic.util.ContextInitializer
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.ostelco.dataflow.pipelines.definitions.DataConsumptionPipelineDefinition
import org.ostelco.dataflow.pipelines.definitions.DummyPipelineDefinition
import org.ostelco.dataflow.pipelines.definitions.PipelineDefinition

enum class PipelineDefinitionRegistry(val pipelineDefinition: PipelineDefinition) {
DATA_CONSUMPTION(DataConsumptionPipelineDefinition),
DUMMY(DummyPipelineDefinition),
}

fun main(args: Array<String>) {
System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "config/logback.xml")
DeployPipeline().deploy(pipelineName = "DATA_CONSUMPTION")
}

class DeployPipeline {

private fun parseOptions(): PipelineOptions {

// may be we need to pass options via command-line args
/*
val options = PipelineOptionsFactory
.fromArgs(
"--project=pantel-2decb",
"--runner=DataflowRunner",
"--stagingLocation=gs://data-traffic/staging/",
"--jobName=data-traffic")
.withValidation()
.create()
*/

val options = PipelineOptionsFactory.`as`(DataflowPipelineOptions::class.java)
options.jobName = "data-traffic"
options.project = "pantel-2decb"
options.stagingLocation = "gs://data-traffic/staging/"
options.region = "europe-west1"
options.runner = DataflowRunner::class.java
options.isUpdate = true

return options
}

fun deploy(pipelineName: String) {

val options = parseOptions()

PipelineDefinitionRegistry
.valueOf(pipelineName)
.apply {
Pipeline.create(options)
.apply { pipelineDefinition.define(this) }
.run()
.waitUntilFinish()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.ostelco.dataflow.pipelines.definitions

import com.google.protobuf.util.Timestamps
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.KvCoder
import org.apache.beam.sdk.coders.VarLongCoder
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder
import org.apache.beam.sdk.transforms.Combine
import org.apache.beam.sdk.transforms.Filter
import org.apache.beam.sdk.transforms.GroupByKey
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.transforms.Sum
import org.apache.beam.sdk.transforms.WithTimestamps
import org.apache.beam.sdk.transforms.windowing.FixedWindows
import org.apache.beam.sdk.transforms.windowing.Window
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
import org.joda.time.Duration
import org.joda.time.Instant
import org.ostelco.analytics.api.AggregatedDataTrafficInfo
import org.ostelco.analytics.api.DataTrafficInfo
import org.ostelco.dataflow.pipelines.dsl.ParDoFn
import org.ostelco.dataflow.pipelines.io.BigQueryIOUtils.saveToBigQuery
import org.ostelco.dataflow.pipelines.io.Table.HOURLY_CONSUMPTION
import org.ostelco.dataflow.pipelines.io.Table.RAW_CONSUMPTION
import org.ostelco.dataflow.pipelines.io.convertToHourlyTableRows
import org.ostelco.dataflow.pipelines.io.convertToRawTableRows
import org.ostelco.dataflow.pipelines.io.readFromPubSub
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

object DataConsumptionPipelineDefinition : PipelineDefinition {

override fun define(pipeline: Pipeline) {

// Filter events with empty buckets
val filterEmptyBucketEvents = Filter.by(SerializableFunction { dataTrafficInfo: DataTrafficInfo ->
dataTrafficInfo.bucketBytes > 0
})

//
// Construct pipeline chain
//


// First two common steps of pipeline, before it gets forked.
val dataTrafficInfoEvents = pipeline
.apply("readFromPubSub", readFromPubSub("data-traffic"))
.apply("filterEmptyBucketEvents", filterEmptyBucketEvents)

// PubSubEvents -> raw_consumption big-query
dataTrafficInfoEvents
.apply("convertToRawTableRows", convertToRawTableRows)
.apply("saveRawEventsToBigQuery", saveToBigQuery(RAW_CONSUMPTION))

// PubSubEvents -> aggregate by hour -> hourly_consumption big-query
dataTrafficInfoEvents
.apply("TotalDataConsumptionGroupByMsisdn", consumptionPerMsisdn)
.apply("convertToHourlyTableRows", convertToHourlyTableRows)
.apply("saveToBigQueryGroupedByHour", saveToBigQuery(HOURLY_CONSUMPTION))
}
}

// This method has a part of pipeline which is independent of GCP PubSubIO and BigQueryIO.
// So, this part of the pipeline can be run locally and does not need GCP.
// This separation is done so that it can be tested using JUnit.
val consumptionPerMsisdn = object : PTransform<PCollection<DataTrafficInfo>, PCollection<AggregatedDataTrafficInfo>>() {

override fun expand(inCollection: PCollection<DataTrafficInfo>): PCollection<AggregatedDataTrafficInfo> {

val linkTimestamps = WithTimestamps
.of<DataTrafficInfo> { Instant(Timestamps.toMillis(it.timestamp)) }
.withAllowedTimestampSkew(Duration.standardMinutes(1L))

val groupByHour: Window<DataTrafficInfo> = Window
.into<DataTrafficInfo>(FixedWindows.of(Duration.standardHours(1L)))
.withAllowedLateness(Duration.standardMinutes(1L))
.discardingFiredPanes()

val toKeyValuePair = ParDoFn.transform<DataTrafficInfo, KV<AggregatedDataTrafficInfo, Long>> {
val zonedDateTime = ZonedDateTime
.ofInstant(java.time.Instant.ofEpochMilli(Timestamps.toMillis(it.timestamp)), ZoneOffset.UTC)
.withMinute(0)
.withSecond(0)
.withNano(0)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:SS")
KV.of(
AggregatedDataTrafficInfo.newBuilder()
.setMsisdn(it.msisdn)
.setDateTime(formatter.format(zonedDateTime))
.setDataBytes(0)
.build(),
it.bucketBytes)
}

val reduceToSumOfBucketBytes = Combine.groupedValues<AggregatedDataTrafficInfo, Long, Long>(Sum.ofLongs())

val kvToSingleObject = ParDoFn.transform<KV<AggregatedDataTrafficInfo, Long>, AggregatedDataTrafficInfo> {
AggregatedDataTrafficInfo.newBuilder()
.setMsisdn(it.key?.msisdn)
.setDateTime(it.key?.dateTime)
.setDataBytes(it.value)
.build()
}

// In this method, the code above is declaring all transformations.
// Whereas the code below is chaining them into a pipeline.

return inCollection
// In order to use timestamp in the event object instead of timestamp when event was registered to PubSub
.apply("linkTimestamps", linkTimestamps)
.apply("groupByHour", groupByHour)
// change to KV and then group by Key
.apply("toKeyValuePair", toKeyValuePair)
.setCoder(KvCoder.of(ProtoCoder.of(AggregatedDataTrafficInfo::class.java), VarLongCoder.of()))
.apply("groupByKey", GroupByKey.create())
// sum for each group
.apply("reduceToSumOfBucketBytes", reduceToSumOfBucketBytes)
.apply("kvToSingleObject", kvToSingleObject)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.ostelco.dataflow.pipelines.definitions

import org.apache.beam.sdk.Pipeline

object DummyPipelineDefinition : PipelineDefinition {
override fun define(pipeline: Pipeline) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
}
Loading

0 comments on commit 5e36d9f

Please sign in to comment.