Skip to content

Commit

Permalink
OCS Kotlin coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
vihangpatil committed Dec 20, 2018
1 parent e65a0b7 commit c47c7ed
Show file tree
Hide file tree
Showing 24 changed files with 727 additions and 332 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.ostelco.prime.storage.graph

import arrow.core.right
import com.palantir.docker.compose.DockerComposeRule
import com.palantir.docker.compose.connection.waiting.HealthChecks
import org.joda.time.Duration
Expand All @@ -8,8 +9,19 @@ import org.junit.BeforeClass
import org.junit.ClassRule
import org.mockito.Mockito
import org.neo4j.driver.v1.AccessMode.WRITE
import org.ostelco.prime.model.*
import org.ostelco.prime.ocs.OcsAdminService
import org.ostelco.prime.analytics.AnalyticsService
import org.ostelco.prime.model.Offer
import org.ostelco.prime.model.Price
import org.ostelco.prime.model.Product
import org.ostelco.prime.model.PurchaseRecord
import org.ostelco.prime.model.ScanInformation
import org.ostelco.prime.model.ScanResult
import org.ostelco.prime.model.ScanStatus
import org.ostelco.prime.model.Segment
import org.ostelco.prime.model.Subscriber
import org.ostelco.prime.model.Subscription
import org.ostelco.prime.paymentprocessor.PaymentProcessor
import org.ostelco.prime.paymentprocessor.core.ProfileInfo
import java.time.Instant
import java.util.*
import kotlin.test.BeforeTest
Expand All @@ -19,7 +31,11 @@ import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.test.fail

class MockOcsAdminService : OcsAdminService by Mockito.mock(OcsAdminService::class.java)
private val mockPaymentProcessor = Mockito.mock(PaymentProcessor::class.java)
class MockPaymentProcessor : PaymentProcessor by mockPaymentProcessor

private val mockAnalyticsService = Mockito.mock(AnalyticsService::class.java)
class MockAnalyticsService : AnalyticsService by mockAnalyticsService

class GraphStoreTest {

Expand Down Expand Up @@ -47,7 +63,7 @@ class GraphStoreTest {
}

@Test
fun `add subscriber`() {
fun `test - add subscriber`() {

Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = null)
.mapLeft { fail(it.message) }
Expand All @@ -63,7 +79,7 @@ class GraphStoreTest {
}

@Test
fun `fail to add subscriber with invalid referred by`() {
fun `test - fail to add subscriber with invalid referred by`() {

Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = "blah")
.fold({
Expand All @@ -75,7 +91,7 @@ class GraphStoreTest {
}

@Test
fun `add subscription`() {
fun `test - add subscription`() {

Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = null)
.mapLeft { fail(it.message) }
Expand All @@ -99,6 +115,103 @@ class GraphStoreTest {
// assertEquals(EMAIL, bundleIdArgCaptor.value)
}

@Test
fun `test - purchase`() {

val sku = "1GB_249NOK"
val chargeId = UUID.randomUUID().toString()
// mock
Mockito.`when`(mockPaymentProcessor.getPaymentProfile(userEmail = EMAIL))
.thenReturn(ProfileInfo(EMAIL).right())

Mockito.`when`(mockPaymentProcessor.authorizeCharge(
customerId = EMAIL,
sourceId = null,
amount = 24900,
currency = "NOK")
).thenReturn(chargeId.right())

Mockito.`when`(mockPaymentProcessor.captureCharge(
customerId = EMAIL,
amount = 24900,
currency = "NOK",
chargeId = chargeId)
).thenReturn(chargeId.right())

// prep
Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = null)
.mapLeft { fail(it.message) }

Neo4jStoreSingleton.addSubscription(EMAIL, MSISDN)
.mapLeft { fail(it.message) }

Neo4jStoreSingleton.createProduct(createProduct(sku = sku, amount = 24900))
.mapLeft { fail(it.message) }

val offer = Offer(
id = "NEW_OFFER",
segments = listOf(getSegmentNameFromCountryCode(COUNTRY)),
products = listOf(sku))

Neo4jStoreSingleton.createOffer(offer)
.mapLeft { fail(it.message) }

// test
Neo4jStoreSingleton.purchaseProduct(subscriberId = EMAIL, sku = sku, sourceId = null, saveCard = false)
.mapLeft { fail(it.message) }

// assert
Neo4jStoreSingleton.getBundles(subscriberId = EMAIL).bimap(
{ fail(it.message) },
{ bundles ->
bundles.forEach { bundle ->
assertEquals(1_100_000_000L, bundle.balance)
}
})
}

@Test
fun `test - consume`() {
Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = null)
.mapLeft { fail(it.message) }

Neo4jStoreSingleton.addSubscription(EMAIL, MSISDN)
.mapLeft { fail(it.message) }

// balance = 100_000_000
// reserved = 0

// requested = 40_000_000
val dataBucketSize = 40_000_000L
Neo4jStoreSingleton.consume(msisdn = MSISDN, usedBytes = 0, requestedBytes = dataBucketSize)
.fold(
{ fail(it.message) },
{
assertEquals(dataBucketSize, it.first) // reserved = 40_000_000
assertEquals(60_000_000L, it.second) // balance = 60_000_000
})

// used = 50_000_000
// requested = 40_000_000
Neo4jStoreSingleton.consume(msisdn = MSISDN, usedBytes = 50_000_000L, requestedBytes = dataBucketSize)
.fold(
{ fail(it.message) },
{
assertEquals(dataBucketSize, it.first) // reserved = 40_000_000
assertEquals(10_000_000L, it.second) // balance = 10_000_000
})

// used = 30_000_000
// requested = 40_000_000
Neo4jStoreSingleton.consume(msisdn = MSISDN, usedBytes = 30_000_000L, requestedBytes = dataBucketSize)
.fold(
{ fail(it.message) },
{
assertEquals(20_000_000L, it.first) // reserved = 20_000_000
assertEquals(0L, it.second) // balance = 0
})
}

@Test
fun `set and get Purchase record`() {
assert(Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = null).isRight())
Expand Down Expand Up @@ -126,20 +239,26 @@ class GraphStoreTest {
assert(Neo4jStoreSingleton.addSubscriber(Subscriber(email = EMAIL, name = NAME, country = COUNTRY), referredBy = null).isRight())

Neo4jStoreSingleton.createProduct(createProduct("1GB_249NOK", 24900))
.mapLeft { fail(it.message) }
Neo4jStoreSingleton.createProduct(createProduct("2GB_299NOK", 29900))
.mapLeft { fail(it.message) }
Neo4jStoreSingleton.createProduct(createProduct("3GB_349NOK", 34900))
.mapLeft { fail(it.message) }
Neo4jStoreSingleton.createProduct(createProduct("5GB_399NOK", 39900))
.mapLeft { fail(it.message) }

val segment = Segment(
id = "NEW_SEGMENT",
subscribers = listOf(EMAIL))
Neo4jStoreSingleton.createSegment(segment)
.mapLeft { fail(it.message) }

val offer = Offer(
id = "NEW_OFFER",
segments = listOf("NEW_SEGMENT"),
products = listOf("3GB_349NOK"))
Neo4jStoreSingleton.createOffer(offer)
.mapLeft { fail(it.message) }

Neo4jStoreSingleton.getProducts(EMAIL).bimap(
{ fail(it.message) },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.ostelco.prime.storage.graph.MockAnalyticsService

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.ostelco.prime.storage.graph.MockPaymentProcessor
16 changes: 16 additions & 0 deletions ocs-ktc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
plugins {
id "org.jetbrains.kotlin.jvm" version "1.3.10"
id "java-library"
}

dependencies {
implementation project(':prime-modules')

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinXCoroutinesVersion"

testImplementation "org.jetbrains.kotlin:kotlin-test:$kotlinVersion"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit:$kotlinVersion"
testImplementation "org.mockito:mockito-core:$mockitoVersion"
}

apply from: '../gradle/jacoco.gradle'
34 changes: 34 additions & 0 deletions ocs-ktc/src/main/kotlin/org/ostelco/prime/ocs/OcsModule.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.ostelco.prime.ocs

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonTypeName
import io.dropwizard.setup.Environment
import org.hibernate.validator.constraints.NotEmpty
import org.ostelco.prime.module.PrimeModule
import org.ostelco.prime.ocs.consumption.OcsGrpcServer
import org.ostelco.prime.ocs.consumption.OcsGrpcService
import org.ostelco.prime.ocs.core.OnlineCharging

@JsonTypeName("ocs")
class OcsModule : PrimeModule {

@JsonProperty
fun setConfig(config: Config) {
ConfigRegistry.config = config
}

override fun init(env: Environment) {
env.lifecycle().manage(
OcsGrpcServer(8082, OcsGrpcService(OnlineCharging)))
}
}

class Config {
@NotEmpty
@JsonProperty("lowBalanceThreshold")
var lowBalanceThreshold: Long = 0
}

object ConfigRegistry {
lateinit var config: Config
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.ostelco.prime.ocs.analytics

import org.ostelco.ocs.api.CreditControlRequestInfo
import org.ostelco.prime.analytics.AnalyticsService
import org.ostelco.prime.getLogger
import org.ostelco.prime.module.getResource

/**
* This class publishes the data consumption information events analytics.
*/
object AnalyticsReporter {

private val logger by getLogger()

private val analyticsReporter by lazy { getResource<AnalyticsService>() }

fun report(request: CreditControlRequestInfo, bundleBytes: Long) {
val msisdn = request.msisdn
if (msisdn != null) {
logger.info("Sent Data Consumption info event to analytics")
analyticsReporter.reportTrafficInfo(
msisdn = msisdn,
usedBytes = request.msccList?.firstOrNull()?.used?.totalOctets ?: 0L,
bundleBytes = bundleBytes,
apn = request.serviceInformation?.psInformation?.calledStationId,
mccMnc = request.serviceInformation?.psInformation?.sgsnMccMnc)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.ostelco.prime.ocs.consumption

import io.grpc.stub.StreamObserver
import org.ostelco.ocs.api.ActivateResponse
import org.ostelco.ocs.api.CreditControlAnswerInfo
import org.ostelco.ocs.api.CreditControlRequestInfo

/**
* Ocs Requests from [OcsGrpcService] are consumed by implementation [OcsService] of [OcsAsyncRequestConsumer]
*/
interface OcsAsyncRequestConsumer {
fun putCreditControlClient(streamId: String, creditControlAnswer: StreamObserver<CreditControlAnswerInfo>)
fun creditControlRequestEvent(streamId: String, request: CreditControlRequestInfo)
fun deleteCreditControlClient(streamId: String)
fun updateActivateResponse(streamId: String, activateResponse: StreamObserver<ActivateResponse>)
}

/**
* Ocs Events from [OcsEventToGrpcResponseMapper] forwarded to implementation [OcsService] of [OcsAsyncResponseProducer]
*/
interface OcsAsyncResponseProducer {
fun activateOnNextResponse(response: ActivateResponse)
fun sendCreditControlAnswer(streamId: String, creditControlAnswer: CreditControlAnswerInfo)
fun returnUnusedDataBucketEvent(msisdn: String, reservedBucketBytes: Long)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.ostelco.prime.ocs.consumption

import io.dropwizard.lifecycle.Managed
import io.grpc.BindableService
import io.grpc.Server
import io.grpc.ServerBuilder
import org.ostelco.prime.getLogger

/**
* This is OCS Server running on gRPC protocol.
* Its startup and shutdown are managed by Dropwizard's lifecycle
* through the Managed interface.
*
*/
class OcsGrpcServer(private val port: Int, service: BindableService) : Managed {

private val logger by getLogger()

// may add Transport Security with Certificates if needed.
// may add executor for control over number of threads
private val server: Server = ServerBuilder.forPort(port).addService(service).build()

override fun start() {
server.start()
logger.info("OcsServer Server started, listening for incoming gRPC traffic on {}", port)
}

override fun stop() {
logger.info("Stopping OcsServer Server listening for gRPC traffic on {}", port)
server.shutdown()
blockUntilShutdown()
}

// Used for unit testing
fun forceStop() {
logger.info("Stopping forcefully OcsServer Server listening for gRPC traffic on {}", port)
server.shutdownNow()
}

private fun blockUntilShutdown() {
server.awaitTermination()
}
}
Loading

0 comments on commit c47c7ed

Please sign in to comment.