Skip to content

Commit

Permalink
prime-1.13.0 - Pseudo MSISDN. Metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
vihangpatil authored Aug 31, 2018
2 parents 3f4fe09 + 78df92f commit 314e8a5
Show file tree
Hide file tree
Showing 112 changed files with 1,612 additions and 668 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ secrets/*
.nb-gradle
.swagger_gen_dir

api_descriptor.pb
ocs_descriptor.pb
metrics_descriptor.pb
2 changes: 1 addition & 1 deletion acceptance-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies {
implementation project(":prime-client-api")
implementation project(':diameter-test')

implementation "com.stripe:stripe-java:6.3.0"
implementation "com.stripe:stripe-java:6.8.0"
implementation 'io.jsonwebtoken:jjwt:0.9.1'
// tests fail when updated to 2.27
implementation "org.glassfish.jersey.media:jersey-media-json-jackson:2.25.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ class SegmentResource {
@PathParam("segment-id") segmentId: String,
segment: Segment): Response {

segment.id = segmentId
if (segment.id != segmentId) {
return Response
.status(Response.Status.FORBIDDEN)
.entity("segment id in path and body do not match")
.build()
}

return adminDataSource.updateSegment(segment)
.fold({ Response.status(Response.Status.NOT_MODIFIED).entity(it.message).build() },
Expand Down
1 change: 1 addition & 0 deletions analytics-grpc-api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Analytics API
8 changes: 4 additions & 4 deletions ocs-api/build.gradle → analytics-grpc-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ plugins {
id "idea"
}

// Keeping it version 1.13.1 to be consistent with grpc via PubSub client lib
// Keeping it version 1.13.1 to be consistent with netty via Firebase lib
ext.grpcVersion = "1.13.1"
// Keeping it version 1.14.0 to be consistent with grpc via PubSub client lib
// Keeping it version 1.14.0 to be consistent with netty via Firebase lib
ext.grpcVersion = "1.14.0"

dependencies {
api "io.grpc:grpc-netty-shaded:$grpcVersion"
Expand All @@ -21,7 +21,7 @@ protobuf {
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
}
}
protoc { artifact = 'com.google.protobuf:protoc:3.6.0' }
protoc { artifact = 'com.google.protobuf:protoc:3.6.1' }
generateProtoTasks {
all()*.plugins {
grpc {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
syntax = "proto3";

package org.ostelco.ocs.api;
package org.ostelco.analytics.api;

option java_multiple_files = true;
option java_package = "org.ostelco.ocs.api";
option java_package = "org.ostelco.analytics.api";

import "google/protobuf/timestamp.proto";

// This is used only to report to Analytics engine by Prime via Google Cloud Pub/Sub.
// This may be moved to a separate library project, in future.

message DataTrafficInfo {
string msisdn = 1;
Expand Down
19 changes: 19 additions & 0 deletions analytics-grpc-api/src/main/proto/prime_metrics.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

package org.ostelco.prime.metrics.api;

option java_multiple_files = true;
option java_package = "org.ostelco.prime.metrics.api";

// This is used to report Analytics events from OCSgw to Prime

service OcsgwAnalyticsService {
rpc OcsgwAnalyticsEvent (stream OcsgwAnalyticsReport) returns (OcsgwAnalyticsReply) {}
}

message OcsgwAnalyticsReport {
uint32 activeSessions = 1;
}

message OcsgwAnalyticsReply {
}
18 changes: 18 additions & 0 deletions analytics-module/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id "org.jetbrains.kotlin.jvm" version "1.2.61"
id "java-library"
}

dependencies {
implementation project(":prime-api")

implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion"
implementation "io.dropwizard:dropwizard-core:$dropwizardVersion"
implementation "com.google.cloud:google-cloud-pubsub:$googleCloudVersion"

testImplementation "io.dropwizard:dropwizard-testing:$dropwizardVersion"
testImplementation 'org.mockito:mockito-core:2.18.3'
testImplementation 'org.assertj:assertj-core:3.10.0'
}

apply from: '../jacoco.gradle'
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.ostelco.prime.analytics

import io.dropwizard.lifecycle.Managed
import io.grpc.BindableService
import io.grpc.Server
import io.grpc.ServerBuilder
import org.ostelco.prime.logger
import java.io.IOException

/**
* This is Analytics Server running on gRPC protocol.
* Its startup and shutdown are managed by Dropwizard's lifecycle
* through the Managed interface.
*
*/
class AnalyticsGrpcServer(private val port: Int, service: BindableService) : Managed {

private val logger by logger()

// may add Transport Security with Certificates if needed.
// may add executor for control over number of threads
private val server: Server = ServerBuilder.forPort(port).addService(service).build()

/**
* Startup is managed by Dropwizard's lifecycle.
*
* @throws IOException ... sometimes, perhaps.
*/
override fun start() {
server.start()
logger.info("Analytics Server started, listening for incoming gRPC traffic on {}", port)
}

/**
* Shutdown is managed by Dropwizard's lifecycle.
*
* @throws InterruptedException When something goes wrong.
*/
override fun stop() {
logger.info("Stopping Analytics Server listening for gRPC traffic on {}", port)
server.shutdown()
blockUntilShutdown()
}

/**
* Used for unit testing
*/
fun forceStop() {
logger.info("Stopping forcefully Analytics Server listening for gRPC traffic on {}", port)
server.shutdownNow()
}

private fun blockUntilShutdown() {
server.awaitTermination()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.ostelco.prime.analytics

import io.grpc.stub.StreamObserver
import org.ostelco.prime.analytics.PrimeMetric.ACTIVE_SESSIONS
import org.ostelco.prime.analytics.metrics.CustomMetricsRegistry
import org.ostelco.prime.logger
import org.ostelco.prime.metrics.api.OcsgwAnalyticsReply
import org.ostelco.prime.metrics.api.OcsgwAnalyticsReport
import org.ostelco.prime.metrics.api.OcsgwAnalyticsServiceGrpc
import java.util.*


/**
* Serves incoming GRPC analytcs requests.
*
* It's implemented as a subclass of [OcsServiceGrpc.OcsServiceImplBase] overriding
* methods that together implements the protocol described in the analytics protobuf
* file: ocs_analytics.proto
*`
* service OcsgwAnalyticsService {
* rpc OcsgwAnalyticsEvent (stream OcsgwAnalyticsReport) returns (OcsgwAnalyticsReply) {}
* }
*/

class AnalyticsGrpcService : OcsgwAnalyticsServiceGrpc.OcsgwAnalyticsServiceImplBase() {

private val logger by logger()

/**
* Handles the OcsgwAnalyticsEvent message.
*/
override fun ocsgwAnalyticsEvent(ocsgwAnalyticsReply: StreamObserver<OcsgwAnalyticsReply>): StreamObserver<OcsgwAnalyticsReport> {
val streamId = newUniqueStreamId()
return StreamObserverForStreamWithId(streamId)
}

/**
* Return an unique ID based on Java's UUID generator that uniquely
* identifies a stream of values.
* @return A new unique identifier.
*/
private fun newUniqueStreamId(): String {
return UUID.randomUUID().toString()
}

private inner class StreamObserverForStreamWithId internal constructor(private val streamId: String) : StreamObserver<OcsgwAnalyticsReport> {

/**
* This method gets called every time a new active session count is sent
* from the OCS GW.
* @param request provides current active session as a counter with a timestamp
*/
override fun onNext(request: OcsgwAnalyticsReport) {
CustomMetricsRegistry.updateMetricValue(ACTIVE_SESSIONS, request.activeSessions.toLong())
}

override fun onError(t: Throwable) {
// TODO vihang: handle onError for stream observers
}

override fun onCompleted() {
logger.info("AnalyticsGrpcService with streamId: {} completed", streamId)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.ostelco.prime.analytics

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonTypeName
import io.dropwizard.setup.Environment
import org.hibernate.validator.constraints.NotEmpty
import org.ostelco.prime.analytics.metrics.CustomMetricsRegistry
import org.ostelco.prime.analytics.publishers.DataConsumptionInfoPublisher
import org.ostelco.prime.module.PrimeModule

@JsonTypeName("analytics")
class AnalyticsModule : PrimeModule {

@JsonProperty("config")
fun setConfig(config: AnalyticsConfig) {
ConfigRegistry.config = config
}

override fun init(env: Environment) {

CustomMetricsRegistry.init(env.metrics())

val server = AnalyticsGrpcServer(8083, AnalyticsGrpcService())

env.lifecycle().manage(server)

// dropwizard starts Analytics events publisher
env.lifecycle().manage(DataConsumptionInfoPublisher)
}
}

class AnalyticsConfig {
@NotEmpty
@JsonProperty("projectId")
lateinit var projectId: String

@NotEmpty
@JsonProperty("topicId")
lateinit var topicId: String
}

object ConfigRegistry {
lateinit var config: AnalyticsConfig
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.ostelco.prime.analytics

import org.ostelco.prime.analytics.metrics.CustomMetricsRegistry
import org.ostelco.prime.analytics.publishers.DataConsumptionInfoPublisher
import org.ostelco.prime.logger

class AnalyticsServiceImpl : AnalyticsService {

private val logger by logger()

override fun reportTrafficInfo(msisdn: String, usedBytes: Long, bundleBytes: Long) {
logger.info("reportTrafficInfo : msisdn {} usedBytes {} bundleBytes {}", msisdn, usedBytes, bundleBytes)
DataConsumptionInfoPublisher.publish(msisdn, usedBytes, bundleBytes)
}

override fun reportMetric(primeMetric: PrimeMetric, value: Long) {
CustomMetricsRegistry.updateMetricValue(primeMetric, value)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.ostelco.prime.analytics.metrics

import com.codahale.metrics.Counter
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import org.ostelco.prime.analytics.MetricType.COUNTER
import org.ostelco.prime.analytics.MetricType.GAUGE
import org.ostelco.prime.analytics.PrimeMetric

/**
* Singleton wrapper dropwizard metrics.
*/
object CustomMetricsRegistry {

private lateinit var registry: MetricRegistry
// boolean flag to avoid access to late init registry if it not yet initialized
private var isInitialized = false

// map of long values which will act as cache for Gauge
private val gaugeValueMap: MutableMap<PrimeMetric, Long> = mutableMapOf()

// map of counters
private val counterMap: MutableMap<PrimeMetric, Counter> = mutableMapOf()

@Synchronized
fun init(registry: MetricRegistry) {
this.registry = registry
isInitialized = true
counterMap.keys.forEach { registerCounter(it) }
gaugeValueMap.keys.forEach { registerGauge(it) }
}

/**
* Update metric value.
*
* If metric is of type COUNTER, then the counter is increment by that value.
* If metric is of type GAUGE, then the gauge source is set to that value.
*
* @param primeMetric
* @param value
*/
@Synchronized
fun updateMetricValue(primeMetric: PrimeMetric, value: Long) {
when (primeMetric.metricType) {
COUNTER -> {
val counterExists = counterMap.containsKey(primeMetric)
counterMap.getOrPut(primeMetric) { Counter() }.inc(value)
if (isInitialized && !counterExists) {
registerCounter(primeMetric)
}
}
GAUGE -> {
val existingGaugeValue = gaugeValueMap.put(primeMetric, value)
if (isInitialized && existingGaugeValue == null) {
registerGauge(primeMetric)
}
}
}
}

/**
* Register counter with value from counterMap
*/
private fun registerCounter(primeMetric: PrimeMetric) {
registry.register(primeMetric.metricName, counterMap[primeMetric])
}

/**
* Register gauge with value from gaugeValueMap as its source
*/
private fun registerGauge(primeMetric: PrimeMetric) {
registry.register(primeMetric.metricName, Gauge<Long> { gaugeValueMap[primeMetric] })
}
}
Loading

0 comments on commit 314e8a5

Please sign in to comment.