Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ts-1083] raw-parsed-toggle #3

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM gradle:6.6-jdk11 AS build
FROM gradle:7.1-jdk11 AS build
ARG Prelease_version=0.0.0
COPY ./ .
RUN gradle clean build dockerPrepare -Prelease_version=${Prelease_version}
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Lightweight data provider (1.1.0)
# Lightweight data provider (1.1.1)

# Overview
This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -29,6 +29,7 @@ This component is similar to [rpt-data-provider](https://github.com/th2-net/th2-
- `resultCountLimit` - number - Sets the maximum amount of messages to return. Defaults to `null (unlimited)`.
- `endTimestamp` - number, unix timestamp in milliseconds - Sets the timestamp to which the search will be performed, starting with `startTimestamp`. When `searchDirection` is `previous`, `endTimestamp` must be less then `startTimestamp`. Defaults to `null` (search can be stopped after reaching `resultCountLimit`).
- `onlyRaw` - boolean - Disabling decoding messages. If it is true, message body will be empty in all messages. Default `false`
- `responseFormats` - text, accepts multiple values - sets response formats. Possible values: BASE_64, PARSED. default value - BASE_64 & PARSED.


Elements in channel match the format sse:
Expand Down Expand Up @@ -58,6 +59,8 @@ spec:
# maxBufferDecodeQueue: 10000 # buffer size for messages that sent to decode but anwers hasn't been received
# decodingTimeout: 60000 # timeout expecting answers from codec.
# batchSize: 100 # batch size from codecs
# codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false)
# responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED)


pins: # pins are used to communicate with codec components to parse message data
Expand Down Expand Up @@ -92,4 +95,4 @@ spec:
requests:
memory: 300Mi
cpu: 50m
```
```
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ repositories {
}

dependencies {
api platform('com.exactpro.th2:bom:3.0.0')
api platform('com.exactpro.th2:bom:3.2.0')

implementation 'org.slf4j:slf4j-api'
implementation 'org.slf4j:slf4j-log4j12'
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -66,9 +66,9 @@ dependencies {
implementation('net.jpountz.lz4:lz4:1.3.0') {
because('cassandra driver requires lz4 impl in classpath for compression')
}
implementation 'com.exactpro.th2:grpc-data-provider:1.1.0'
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
implementation 'com.exactpro.th2:grpc-data-provider:1.1.0-TS-1083-response-formats-2704627821-SNAPSHOT'

implementation 'io.github.microutils:kotlin-logging:2.1.14'
implementation 'io.github.microutils:kotlin-logging:2.1.23'
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved

implementation('io.prometheus:simpleclient') {
because('need add custom metrics to provider')
Expand All @@ -83,7 +83,7 @@ dependencies {
}

testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2'
testImplementation 'org.assertj:assertj-core:3.12.2'
testImplementation 'org.assertj:assertj-core:3.23.1'
}

test {
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
################################################################################
# Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
# Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,6 @@

kotlin.code.style=official

release_version=1.1.0
release_version=1.1.1

docker_image_name=
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
################################################################################
# Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
# Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
################################################################################

#Tue Apr 14 11:21:33 MSK 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package com.exactpro.th2.lwdataprovider

import com.exactpro.th2.common.grpc.MessageBatch
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.schema.message.QueueAttribute
import com.exactpro.th2.lwdataprovider.configuration.Configuration
Expand All @@ -37,7 +35,11 @@ class RabbitMqDecoder(private val configuration: Configuration,
companion object {
private val logger = KotlinLogging.logger { }
}


fun sendAllBatchMessage(batch: MessageGroupBatch) {
this.messageRouterRawBatch.sendAll(batch, QueueAttribute.RAW.value)
}

fun sendBatchMessage(batch: MessageGroupBatch, session: String) {
this.messageRouterRawBatch.send(batch, session, QueueAttribute.RAW.value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class MessageRequestContext (

fun allDataLoadedFromCradle() = allMessagesRequested.set(true)

abstract fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit = {}): RequestedMessageDetails;
abstract fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List<String>, onResponse: () -> Unit = {}): RequestedMessageDetails;
abstract fun addStreamInfo();

override fun onMessageSent() {
Expand Down Expand Up @@ -137,6 +137,7 @@ abstract class RequestedMessageDetails (
@Volatile var time: Long,
val storedMessage: StoredMessage,
protected open val context: MessageRequestContext,
val responseFormats: List<String>,
var parsedMessage: List<Message>? = null,
var rawMessage: RawMessage? = null,
private val onResponse: () -> Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class CustomConfigurationClass {
val mode: String? = null
val grpcBackPressure : Boolean? = null
val bufferPerQuery: Int? = null
val codecUsePinAttributes: Boolean = true
val responseFormats: List<String> = emptyList()
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
}

class Configuration(customConfiguration: CustomConfigurationClass) {
Expand All @@ -47,6 +49,8 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
customConfiguration.mode?.let { Mode.valueOf(it.uppercase(Locale.getDefault())) }, Mode.HTTP)
val grpcBackPressure: Boolean = VariableBuilder.getVariable("grpcBackPressure", customConfiguration.grpcBackPressure, false)
val bufferPerQuery: Int = VariableBuilder.getVariable("bufferPerQuery", customConfiguration.bufferPerQuery, 0)
val codecUsePinAttributes: Boolean = VariableBuilder.getVariable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes, true)
val responseFormats: List<String> = VariableBuilder.getVariable("responseFormats", customConfiguration.responseFormats, emptyList())
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
}

enum class Mode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan

private val storage = cradleManager.storage
private val batchSize = configuration.batchSize
private val codecUsePinAttributes = configuration.codecUsePinAttributes

companion object {
private val logger = KotlinLogging.logger { }
}

fun getStreams(): Collection<String> = storage.streams

fun getMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext) {
fun getMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext, responseFormats: List<String>) {

var msgCount = 0
val time = measureTimeMillis {
Expand All @@ -65,7 +66,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
msgId = storedMessage.id
val id = storedMessage.id.toString()
val decodingStep = requestContext.startStep("decoding")
val tmp = requestContext.createMessageDetails(id, 0, storedMessage) { decodingStep.finish() }
val tmp = requestContext.createMessageDetails(id, 0, storedMessage, responseFormats) { decodingStep.finish() }
messageBuffer.add(tmp)
++msgBufferCount
tmp.rawMessage = requestContext.startStep("raw_message_parsing").use { RawMessage.parseFrom(storedMessage.content) }.also {
Expand All @@ -79,7 +80,11 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
decoder.registerMessage(it)
requestContext.registerMessage(it)
}
decoder.sendBatchMessage(builder.build(), sessionName)
if (codecUsePinAttributes) {
decoder.sendBatchMessage(builder.build(), sessionName)
} else {
decoder.sendAllBatchMessage(builder.build())
}

messageBuffer.clear()
builder.clear()
Expand All @@ -101,7 +106,11 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
}

if (msgBufferCount > 0) {
decoder.sendBatchMessage(builder.build(), sessionName)
if (codecUsePinAttributes) {
decoder.sendBatchMessage(builder.build(), sessionName)
} else {
decoder.sendAllBatchMessage(builder.build())
}
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved

val sendingTime = System.currentTimeMillis()
messageBuffer.forEach {
Expand Down Expand Up @@ -136,7 +145,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
}
msgId = storedMessageBatch.id
val id = storedMessageBatch.id.toString()
val tmp = requestContext.createMessageDetails(id, time, storedMessageBatch)
val tmp = requestContext.createMessageDetails(id, time, storedMessageBatch, emptyList())
tmp.rawMessage = RawMessage.parseFrom(storedMessageBatch.content)
tmp.responseMessage()
msgCount++
Expand Down Expand Up @@ -166,7 +175,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan

val time = System.currentTimeMillis()
val decodingStep = if (onlyRaw) null else requestContext.startStep("decoding")
val tmp = requestContext.createMessageDetails(message.id.toString(), time, message) { decodingStep?.finish() }
val tmp = requestContext.createMessageDetails(message.id.toString(), time, message, emptyList()) { decodingStep?.finish() }
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
tmp.rawMessage = RawMessage.parseFrom(message.content)
requestContext.loadedMessages += 1

Expand All @@ -178,7 +187,12 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
.build()
decoder.registerMessage(tmp)
requestContext.registerMessage(tmp)
decoder.sendBatchMessage(msgBatch, message.streamName)
if (codecUsePinAttributes) {
decoder.sendBatchMessage(msgBatch, message.streamName)
} else {
decoder.sendAllBatchMessage(msgBatch)
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ data class SseMessageSearchRequest(
val attachedEvents: Boolean,
val lookupLimitDays: Int?,
val resumeFromIdsList: List<StoredMessageId>?,
val onlyRaw: Boolean
val onlyRaw: Boolean,
val responseFormats: List<String>?
) {

companion object {
Expand Down Expand Up @@ -86,7 +87,8 @@ data class SseMessageSearchRequest(
keepOpen = parameters["keepOpen"]?.firstOrNull()?.toBoolean() ?: false,
attachedEvents = parameters["attachedEvents"]?.firstOrNull()?.toBoolean() ?: false,
lookupLimitDays = parameters["lookupLimitDays"]?.firstOrNull()?.toInt(),
onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false
onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false,
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
responseFormats = parameters["responseFormats"]
)

constructor(grpcRequest: MessageSearchRequest) : this(
Expand All @@ -99,7 +101,8 @@ data class SseMessageSearchRequest(
keepOpen = if (grpcRequest.hasKeepOpen()) grpcRequest.keepOpen.value else false,
attachedEvents = false, // disabled
lookupLimitDays = null,
onlyRaw = false // NOT SUPPORTED in GRPC
onlyRaw = false, // NOT SUPPORTED in GRPC
responseFormats = grpcRequest.responseFormatsList
)

private fun checkEndTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ open class GrpcDataProviderImpl(
val grpcResponseHandler = GrpcResponseHandler(queue)
val context = GrpcMessageRequestContext(grpcResponseHandler, maxMessagesPerRequest = configuration.bufferPerQuery)
val loadingStep = context.startStep("messages_loading")
searchMessagesHandler.loadMessages(requestParams, context)
searchMessagesHandler.loadMessages(requestParams, context, configuration)
try {
processResponse(responseObserver, grpcResponseHandler, context, loadingStep::finish) { it.message }
} catch (ex: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class GrpcMessageRequestContext (
maxMessagesPerRequest = maxMessagesPerRequest) {


override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit): GrpcRequestedMessageDetails {
return GrpcRequestedMessageDetails(id, time, storedMessage, this, onResponse)
override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List<String>, onResponse: () -> Unit): GrpcRequestedMessageDetails {
return GrpcRequestedMessageDetails(id, time, storedMessage, this, responseFormats, onResponse)
}

override fun addStreamInfo() {
Expand All @@ -58,10 +58,11 @@ class GrpcRequestedMessageDetails(
time: Long,
storedMessage: StoredMessage,
override val context: GrpcMessageRequestContext,
responseFormats: List<String>,
onResponse: () -> Unit,
parsedMessage: List<Message>? = null,
rawMessage: RawMessage? = null
) : RequestedMessageDetails(id, time, storedMessage, context, parsedMessage, rawMessage, onResponse) {
) : RequestedMessageDetails(id, time, storedMessage, context, responseFormats, parsedMessage, rawMessage, onResponse) {

override fun responseMessageInternal() {
val msg = GrpcMessageProducer.createMessage(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.messages.StoredMessageFilterBuilder
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.lwdataprovider.MessageRequestContext
import com.exactpro.th2.lwdataprovider.configuration.Configuration
import com.exactpro.th2.lwdataprovider.db.CradleMessageExtractor
import com.exactpro.th2.lwdataprovider.entities.requests.GetMessageRequest
import com.exactpro.th2.lwdataprovider.entities.requests.SseMessageSearchRequest
Expand All @@ -40,7 +41,7 @@ class SearchMessagesHandler(
return cradleMsgExtractor.getStreams();
}

fun loadMessages(request: SseMessageSearchRequest, requestContext: MessageRequestContext) {
fun loadMessages(request: SseMessageSearchRequest, requestContext: MessageRequestContext, configuration: Configuration) {

if (request.stream == null && request.resumeFromIdsList.isNullOrEmpty()) {
return;
Expand Down Expand Up @@ -72,10 +73,12 @@ class SearchMessagesHandler(

}.build()

if (!request.onlyRaw)
cradleMsgExtractor.getMessages(filter, requestContext)
else
val responseFormats = request.responseFormats ?: configuration.responseFormats
if (request.onlyRaw || (responseFormats.contains("BASE_64") && responseFormats.size == 1)) {
cradleMsgExtractor.getRawMessages(filter, requestContext)
} else {
cradleMsgExtractor.getMessages(filter, requestContext, responseFormats)
}
limitReached = request.resultCountLimit != null && request.resultCountLimit <= requestContext.loadedMessages
}
} else {
Expand All @@ -94,10 +97,12 @@ class SearchMessagesHandler(
request.resultCountLimit?.let { limit(max(it - requestContext.loadedMessages, 0)) }
}.build()

if (!request.onlyRaw)
cradleMsgExtractor.getMessages(filter, requestContext)
else
val responseFormats = request.responseFormats ?: configuration.responseFormats
if (request.onlyRaw || (responseFormats.contains("BASE_64") && responseFormats.size == 1)) {
cradleMsgExtractor.getRawMessages(filter, requestContext)
} else {
cradleMsgExtractor.getMessages(filter, requestContext, responseFormats)
}

limitReached = request.resultCountLimit != null && request.resultCountLimit <= requestContext.loadedMessages
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GetMessagesServlet (
val reqContext = MessageSseRequestContext(sseResponse, queryParametersMap, maxMessagesPerRequest = configuration.bufferPerQuery)
reqContext.startStep("messages_loading").use {
keepAliveHandler.addKeepAliveData(reqContext)
searchMessagesHandler.loadMessages(request, reqContext)
searchMessagesHandler.loadMessages(request, reqContext,configuration)

this.waitAndWrite(queue, resp, reqContext)
keepAliveHandler.removeKeepAliveData(reqContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class MessageSseRequestContext (
maxMessagesPerRequest = maxMessagesPerRequest) {


override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit) : RequestedMessageDetails {
return SseRequestedMessageDetails(id, time, storedMessage, this, onResponse)
override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List<String>, onResponse: () -> Unit) : RequestedMessageDetails {
return SseRequestedMessageDetails(id, time, storedMessage, this, responseFormats, onResponse)
}

override fun addStreamInfo() {
Expand All @@ -54,10 +54,11 @@ class SseRequestedMessageDetails(
time: Long,
storedMessage: StoredMessage,
override val context: MessageSseRequestContext,
responseFormats: List<String>,
onResponse: () -> Unit,
parsedMessage: List<Message>? = null,
rawMessage: RawMessage? = null
) : RequestedMessageDetails(id, time, storedMessage, context, parsedMessage, rawMessage, onResponse) {
) : RequestedMessageDetails(id, time, storedMessage, context, responseFormats, parsedMessage, rawMessage, onResponse) {

override fun responseMessageInternal() {
val msg = MessageProducer53.createMessage(this, context.jsonFormatter)
Expand Down
Loading