Skip to content

Commit

Permalink
create a fallback strategy for kafka-template and handle multiple tem…
Browse files Browse the repository at this point in the history
…plate situation safely #574 (#575)
  • Loading branch information
osoykan authored Sep 4, 2024
1 parent 5fc7622 commit fee0c57
Showing 1 changed file with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import com.trendyol.stove.testing.e2e.messaging.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.system.abstractions.*
import kotlinx.coroutines.*
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.header.internals.RecordHeader
import org.slf4j.*
import org.springframework.beans.factory.getBean
import org.springframework.beans.factory.*
import org.springframework.context.ApplicationContext
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.RecordInterceptor
import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.*
import kotlin.time.Duration.Companion.seconds

@KafkaDsl
Expand Down Expand Up @@ -42,8 +42,39 @@ class KafkaSystem(
override suspend fun afterRun(context: ApplicationContext) {
applicationContext = context
checkIfInterceptorConfiguredProperly(context)
kafkaTemplate = context.getBean()
kafkaTemplate.setProducerListener(getInterceptor())
kafkaTemplate = createKafkaTemplate(context, exposedConfiguration)
}

private fun createKafkaTemplate(context: ApplicationContext, exposedConfiguration: KafkaExposedConfiguration): KafkaTemplate<Any, Any> {
val kafkaTemplates: Map<String, KafkaTemplate<Any, Any>> = context.getBeansOfType()
return kafkaTemplates
.values
.onEach {
it.setProducerListener(getInterceptor())
it.setCloseTimeout(1.seconds.toJavaDuration())
}
.firstOrNone {
it.producerFactory.configurationProperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] == exposedConfiguration.bootstrapServers
}
.getOrElse {
logger.warn("No KafkaTemplate found for the configured bootstrap servers, using a fallback KafkaTemplate")
createFallbackTemplate(exposedConfiguration)
}
}

private fun createFallbackTemplate(exposedConfiguration: KafkaExposedConfiguration): KafkaTemplate<Any, Any> {
val producerFactory = DefaultKafkaProducerFactory<Any, Any>(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to exposedConfiguration.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer"
)
)
val fallbackTemplate = KafkaTemplate(producerFactory).also {
it.setProducerListener(getInterceptor())
it.setCloseTimeout(1.seconds.toJavaDuration())
}
return fallbackTemplate
}

private fun checkIfInterceptorConfiguredProperly(context: ApplicationContext) {
Expand Down

0 comments on commit fee0c57

Please sign in to comment.