Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ts-1083] raw-parsed-toggle #3

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM gradle:6.6-jdk11 AS build
FROM gradle:7.1-jdk11 AS build
ARG Prelease_version=0.0.0
COPY ./ .
RUN gradle clean build dockerPrepare -Prelease_version=${Prelease_version}
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Lightweight data provider (1.1.0)
# Lightweight data provider (1.1.1)

# Overview
This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -29,6 +29,7 @@ This component is similar to [rpt-data-provider](https://github.com/th2-net/th2-
- `resultCountLimit` - number - Sets the maximum amount of messages to return. Defaults to `null (unlimited)`.
- `endTimestamp` - number, unix timestamp in milliseconds - Sets the timestamp to which the search will be performed, starting with `startTimestamp`. When `searchDirection` is `previous`, `endTimestamp` must be less then `startTimestamp`. Defaults to `null` (search can be stopped after reaching `resultCountLimit`).
- `onlyRaw` - boolean - Disabling decoding messages. If it is true, message body will be empty in all messages. Default `false`
- `responseFormats` - text, accepts multiple values - sets response formats. Possible values: BASE_64, PARSED. default value - BASE_64 & PARSED.


Elements in channel match the format sse:
Expand Down Expand Up @@ -58,6 +59,8 @@ spec:
# maxBufferDecodeQueue: 10000 # buffer size for messages that sent to decode but anwers hasn't been received
# decodingTimeout: 60000 # timeout expecting answers from codec.
# batchSize: 100 # batch size from codecs
# codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false)
# responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED)


pins: # pins are used to communicate with codec components to parse message data
Expand Down Expand Up @@ -92,4 +95,4 @@ spec:
requests:
memory: 300Mi
cpu: 50m
```
```
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ repositories {
}

dependencies {
api platform('com.exactpro.th2:bom:3.0.0')
api platform('com.exactpro.th2:bom:3.2.0')

implementation 'org.slf4j:slf4j-api'
implementation 'org.slf4j:slf4j-log4j12'
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -66,9 +66,9 @@ dependencies {
implementation('net.jpountz.lz4:lz4:1.3.0') {
because('cassandra driver requires lz4 impl in classpath for compression')
}
implementation 'com.exactpro.th2:grpc-data-provider:1.1.0'
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
implementation 'com.exactpro.th2:grpc-data-provider:1.1.0-TS-1083-response-formats-2759013731-SNAPSHOT'

implementation 'io.github.microutils:kotlin-logging:2.1.14'
implementation 'io.github.microutils:kotlin-logging:2.1.23'
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved

implementation('io.prometheus:simpleclient') {
because('need add custom metrics to provider')
Expand All @@ -83,7 +83,7 @@ dependencies {
}

testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2'
testImplementation 'org.assertj:assertj-core:3.12.2'
testImplementation 'org.assertj:assertj-core:3.23.1'
}

test {
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
################################################################################
# Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
# Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,6 @@

kotlin.code.style=official

release_version=1.1.0
release_version=1.1.1

docker_image_name=
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
################################################################################
# Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
# Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
################################################################################

#Tue Apr 14 11:21:33 MSK 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.1-bin.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright 2009-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2009-2022 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package com.exactpro.th2.lwdataprovider

import com.exactpro.th2.common.grpc.MessageBatch
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.schema.message.QueueAttribute
import com.exactpro.th2.lwdataprovider.configuration.Configuration
Expand All @@ -37,9 +35,13 @@ class RabbitMqDecoder(private val configuration: Configuration,
companion object {
private val logger = KotlinLogging.logger { }
}

fun sendBatchMessage(batch: MessageGroupBatch, session: String) {
this.messageRouterRawBatch.send(batch, session, QueueAttribute.RAW.value)

fun sendBatchMessage(batch: MessageGroupBatch, session: String, codecUsePinAttributes: Boolean) {
if (codecUsePinAttributes) {
this.messageRouterRawBatch.send(batch, session, QueueAttribute.RAW.value)
} else {
this.messageRouterRawBatch.sendAll(batch, QueueAttribute.RAW.value)
}
}

fun registerMessage(message: RequestedMessageDetails) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.exactpro.th2.lwdataprovider
import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.th2.common.grpc.Message
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat
import com.exactpro.th2.lwdataprovider.entities.responses.LastScannedObjectInfo
import io.prometheus.client.Histogram
import mu.KotlinLogging
Expand Down Expand Up @@ -85,7 +86,7 @@ abstract class MessageRequestContext (

fun allDataLoadedFromCradle() = allMessagesRequested.set(true)

abstract fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit = {}): RequestedMessageDetails;
abstract fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List<ResponseFormat>, onResponse: () -> Unit = {}): RequestedMessageDetails;
abstract fun addStreamInfo();

override fun onMessageSent() {
Expand Down Expand Up @@ -137,6 +138,7 @@ abstract class RequestedMessageDetails (
@Volatile var time: Long,
val storedMessage: StoredMessage,
protected open val context: MessageRequestContext,
val responseFormats: List<ResponseFormat>,
var parsedMessage: List<Message>? = null,
var rawMessage: RawMessage? = null,
private val onResponse: () -> Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.exactpro.th2.lwdataprovider.configuration

import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat
import java.util.*


Expand All @@ -31,6 +32,8 @@ class CustomConfigurationClass {
val mode: String? = null
val grpcBackPressure : Boolean? = null
val bufferPerQuery: Int? = null
val codecUsePinAttributes: Boolean = true
val defaultResponseFormats: List<ResponseFormat> = listOf(ResponseFormat.ALL)
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
}

class Configuration(customConfiguration: CustomConfigurationClass) {
Expand All @@ -47,6 +50,9 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
customConfiguration.mode?.let { Mode.valueOf(it.uppercase(Locale.getDefault())) }, Mode.HTTP)
val grpcBackPressure: Boolean = VariableBuilder.getVariable("grpcBackPressure", customConfiguration.grpcBackPressure, false)
val bufferPerQuery: Int = VariableBuilder.getVariable("bufferPerQuery", customConfiguration.bufferPerQuery, 0)
val codecUsePinAttributes: Boolean = VariableBuilder.getVariable("codecUsePinAttributes", customConfiguration.codecUsePinAttributes, true)
val defaultResponseFormats: List<ResponseFormat> = VariableBuilder.getVariable("defaultResponseFormats",
customConfiguration.defaultResponseFormats, listOf(ResponseFormat.ALL))
}

enum class Mode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.message.plusAssign
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat
import com.exactpro.th2.lwdataprovider.MessageRequestContext
import com.exactpro.th2.lwdataprovider.RabbitMqDecoder
import com.exactpro.th2.lwdataprovider.RequestedMessageDetails
Expand All @@ -36,14 +37,15 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan

private val storage = cradleManager.storage
private val batchSize = configuration.batchSize
private val codecUsePinAttributes = configuration.codecUsePinAttributes

companion object {
private val logger = KotlinLogging.logger { }
}

fun getStreams(): Collection<String> = storage.streams

fun getMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext) {
fun getMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext, responseFormats: List<ResponseFormat>) {
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved

var msgCount = 0
val time = measureTimeMillis {
Expand All @@ -65,7 +67,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
msgId = storedMessage.id
val id = storedMessage.id.toString()
val decodingStep = requestContext.startStep("decoding")
val tmp = requestContext.createMessageDetails(id, 0, storedMessage) { decodingStep.finish() }
val tmp = requestContext.createMessageDetails(id, 0, storedMessage, responseFormats) { decodingStep.finish() }
messageBuffer.add(tmp)
++msgBufferCount
tmp.rawMessage = requestContext.startStep("raw_message_parsing").use { RawMessage.parseFrom(storedMessage.content) }.also {
Expand All @@ -79,8 +81,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
decoder.registerMessage(it)
requestContext.registerMessage(it)
}
decoder.sendBatchMessage(builder.build(), sessionName)

decoder.sendBatchMessage(builder.build(), sessionName, codecUsePinAttributes)
messageBuffer.clear()
builder.clear()
msgCount += msgBufferCount
Expand All @@ -101,7 +102,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
}

if (msgBufferCount > 0) {
decoder.sendBatchMessage(builder.build(), sessionName)
decoder.sendBatchMessage(builder.build(), sessionName, codecUsePinAttributes)

val sendingTime = System.currentTimeMillis()
messageBuffer.forEach {
Expand All @@ -121,7 +122,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
}


fun getRawMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext) {
fun getRawMessages(filter: StoredMessageFilter, requestContext: MessageRequestContext, responseFormats: List<ResponseFormat>) {

var msgCount = 0
val time = measureTimeMillis {
Expand All @@ -136,7 +137,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
}
msgId = storedMessageBatch.id
val id = storedMessageBatch.id.toString()
val tmp = requestContext.createMessageDetails(id, time, storedMessageBatch)
val tmp = requestContext.createMessageDetails(id, time, storedMessageBatch, responseFormats)
tmp.rawMessage = RawMessage.parseFrom(storedMessageBatch.content)
tmp.responseMessage()
msgCount++
Expand Down Expand Up @@ -166,7 +167,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan

val time = System.currentTimeMillis()
val decodingStep = if (onlyRaw) null else requestContext.startStep("decoding")
val tmp = requestContext.createMessageDetails(message.id.toString(), time, message) { decodingStep?.finish() }
val tmp = requestContext.createMessageDetails(message.id.toString(), time, message, emptyList()) { decodingStep?.finish() }
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
tmp.rawMessage = RawMessage.parseFrom(message.content)
requestContext.loadedMessages += 1

Expand All @@ -178,7 +179,7 @@ class CradleMessageExtractor(configuration: Configuration, private val cradleMan
.build()
decoder.registerMessage(tmp)
requestContext.registerMessage(tmp)
decoder.sendBatchMessage(msgBatch, message.streamName)
decoder.sendBatchMessage(msgBatch, message.streamName, codecUsePinAttributes)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
package com.exactpro.th2.lwdataprovider.entities.requests

import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat
import com.exactpro.th2.lwdataprovider.grpc.toStoredMessageId


data class GetMessageRequest(
val msgId: String,
val onlyRaw: Boolean
val onlyRaw: Boolean,
val responseFormats: List<ResponseFormat>? = listOf(ResponseFormat.ALL)
) {

constructor(msgId: String, parameters: Map<String, List<String>>) : this(
msgId = msgId,
onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false
onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false,
responseFormats = parameters["responseFormats"]?.map { x -> ResponseFormat.valueOf(x) }
)

constructor(msgId: MessageID) : this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import com.exactpro.cradle.Direction
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat
import com.exactpro.th2.dataprovider.grpc.MessageStreamPointer
import com.exactpro.th2.lwdataprovider.entities.exceptions.InvalidRequestException
import com.exactpro.th2.lwdataprovider.grpc.toInstant
import com.exactpro.th2.lwdataprovider.grpc.toProviderMessageStreams
import com.exactpro.th2.lwdataprovider.grpc.toProviderRelation
import com.exactpro.th2.lwdataprovider.grpc.toStoredMessageId
import java.time.Instant
import kotlin.streams.toList

data class SseMessageSearchRequest(
val startTimestamp: Instant?,
Expand All @@ -39,7 +39,10 @@ data class SseMessageSearchRequest(
val attachedEvents: Boolean,
val lookupLimitDays: Int?,
val resumeFromIdsList: List<StoredMessageId>?,
val onlyRaw: Boolean
@Deprecated("Use responseFormats instead", ReplaceWith("responseFormats",
"com.exactpro.th2.lwdataprovider.entities.requests"), DeprecationLevel.WARNING)
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
val onlyRaw: Boolean,
val responseFormats: List<ResponseFormat>? = listOf(ResponseFormat.ALL)
) {

companion object {
Expand Down Expand Up @@ -86,7 +89,8 @@ data class SseMessageSearchRequest(
keepOpen = parameters["keepOpen"]?.firstOrNull()?.toBoolean() ?: false,
attachedEvents = parameters["attachedEvents"]?.firstOrNull()?.toBoolean() ?: false,
lookupLimitDays = parameters["lookupLimitDays"]?.firstOrNull()?.toInt(),
onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false
onlyRaw = parameters["onlyRaw"]?.firstOrNull()?.toBoolean() ?: false,
Dmitriy-Yugay marked this conversation as resolved.
Show resolved Hide resolved
responseFormats = parameters["responseFormats"]?.map { x -> ResponseFormat.valueOf(x) }
)

constructor(grpcRequest: MessageSearchRequest) : this(
Expand All @@ -99,7 +103,8 @@ data class SseMessageSearchRequest(
keepOpen = if (grpcRequest.hasKeepOpen()) grpcRequest.keepOpen.value else false,
attachedEvents = false, // disabled
lookupLimitDays = null,
onlyRaw = false // NOT SUPPORTED in GRPC
onlyRaw = false, // NOT SUPPORTED in GRPC
responseFormats = grpcRequest.responseFormatsList
)

private fun checkEndTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ open class GrpcDataProviderImpl(
val grpcResponseHandler = GrpcResponseHandler(queue)
val context = GrpcMessageRequestContext(grpcResponseHandler, maxMessagesPerRequest = configuration.bufferPerQuery)
val loadingStep = context.startStep("messages_loading")
searchMessagesHandler.loadMessages(requestParams, context)
searchMessagesHandler.loadMessages(requestParams, context, configuration)
try {
processResponse(responseObserver, grpcResponseHandler, context, loadingStep::finish) { it.message }
} catch (ex: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.exactpro.th2.lwdataprovider.grpc
import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.th2.common.grpc.Message
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest.ResponseFormat
import com.exactpro.th2.dataprovider.grpc.MessageSearchResponse
import com.exactpro.th2.dataprovider.grpc.MessageStreamPointers
import com.exactpro.th2.lwdataprovider.GrpcResponseHandler
Expand All @@ -42,8 +43,8 @@ class GrpcMessageRequestContext (
maxMessagesPerRequest = maxMessagesPerRequest) {


override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, onResponse: () -> Unit): GrpcRequestedMessageDetails {
return GrpcRequestedMessageDetails(id, time, storedMessage, this, onResponse)
override fun createMessageDetails(id: String, time: Long, storedMessage: StoredMessage, responseFormats: List<ResponseFormat>, onResponse: () -> Unit): GrpcRequestedMessageDetails {
return GrpcRequestedMessageDetails(id, time, storedMessage, this, responseFormats, onResponse)
}

override fun addStreamInfo() {
Expand All @@ -58,10 +59,11 @@ class GrpcRequestedMessageDetails(
time: Long,
storedMessage: StoredMessage,
override val context: GrpcMessageRequestContext,
responseFormats: List<ResponseFormat>,
onResponse: () -> Unit,
parsedMessage: List<Message>? = null,
rawMessage: RawMessage? = null
) : RequestedMessageDetails(id, time, storedMessage, context, parsedMessage, rawMessage, onResponse) {
) : RequestedMessageDetails(id, time, storedMessage, context, responseFormats, parsedMessage, rawMessage, onResponse) {

override fun responseMessageInternal() {
val msg = GrpcMessageProducer.createMessage(this)
Expand Down
Loading