Skip to content

Commit

Permalink
Asukhovitsky Separate streams threads, create postgres ES indexes, in…
Browse files Browse the repository at this point in the history
…crease max inactive interval to prevent blocking the app (#49)
  • Loading branch information
andrsuh authored May 10, 2024
1 parent dad78e0 commit cd1adac
Show file tree
Hide file tree
Showing 17 changed files with 47 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>

<properties>
<java.version>11</java.version>
Expand Down
2 changes: 1 addition & 1 deletion tiny-event-sourcing-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<name>tiny-event-sourcing-lib</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class EventSourcingProperties (
var autoScanEnabled: Boolean = false,
var scanPackage: String? = null,
var spinLockMaxAttempts: Int = 25,
var maxActiveReaderInactivityPeriod: Duration = 5.minutes,
var maxActiveReaderInactivityPeriod: Duration = 5.seconds,
var recordReadIndexCommitPeriod: Int = 10,
val eventReaderHealthCheckPeriod: Duration = 5.seconds,
val eventReaderHealthCheckPeriod: Duration = 3.seconds,
var sagasEnabled: Boolean = true,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ru.quipy.saga.aggregate.stream
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import ru.quipy.core.AggregateRegistry
import ru.quipy.core.EventSourcingProperties
import ru.quipy.core.EventSourcingService
import ru.quipy.domain.Aggregate
import ru.quipy.saga.SagaContext
Expand All @@ -11,6 +12,8 @@ import ru.quipy.saga.SagaStep
import ru.quipy.saga.aggregate.api.SagaStepAggregate
import ru.quipy.saga.aggregate.logic.SagaStepAggregateState
import ru.quipy.streams.*
import ru.quipy.streams.annotation.RetryConf
import ru.quipy.streams.annotation.RetryFailedStrategy
import java.util.*
import java.util.concurrent.Executors

Expand All @@ -23,20 +26,26 @@ import java.util.concurrent.Executors
class SagaEventStream(
private val aggregateRegistry: AggregateRegistry,
private val eventsStreamManager: AggregateEventStreamManager,
private val sagaStepEsService: EventSourcingService<UUID, SagaStepAggregate, SagaStepAggregateState>
private val sagaStepEsService: EventSourcingService<UUID, SagaStepAggregate, SagaStepAggregateState>,
private val props: EventSourcingProperties,
) {
@Volatile
private var active = true
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val logger = LoggerFactory.getLogger(SagaEventStream::class.java)

fun init() {
if (props.sagasEnabled.not()) {
return
}

val aggregates = aggregateRegistry.getAllAggregates()
// todo sukhoa
aggregates.filter { it != SagaStepAggregate::class }
.forEach {
val streamName = "saga::" + it.simpleName
val aggregateStream = eventsStreamManager.createEventStream(streamName, it)
val aggregateStream = eventsStreamManager.createEventStream(
streamName, it, RetryConf(0, RetryFailedStrategy.SKIP_EVENT))

launchSagaEventStream(streamName, aggregateStream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class AggregateEventStreamManager(
) {
private val eventStreamListener: EventStreamListenerImpl = EventStreamListenerImpl()// todo sukhoa make injectable

private val eventStoreReaderDispatcher = Executors.newFixedThreadPool(16).asCoroutineDispatcher()
private val eventStreamsDispatcher = Executors.newFixedThreadPool(32).asCoroutineDispatcher() // todo sukhoa fix

private val eventStreams = ConcurrentHashMap<String, AggregateEventStream<*>>()

fun <A : Aggregate> createEventStream(
Expand All @@ -39,7 +36,7 @@ class AggregateEventStreamManager(
aggregateInfo as AggregateRegistry.BasicAggregateInfo<Aggregate>,
eventSourcingProperties,
eventStreamListener,
eventStoreReaderDispatcher
Executors.newFixedThreadPool(1).asCoroutineDispatcher() // eventStoreReaderDispatcher
)

val eventsChannel = EventsChannel()
Expand All @@ -53,7 +50,7 @@ class AggregateEventStreamManager(
eventStoreReader,
retryConfig,
eventStreamListener,
eventStreamsDispatcher
Executors.newFixedThreadPool(1).asCoroutineDispatcher() // eventStreamsDispatcher
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class BufferedAggregateEventStream<A : Aggregate>(
delay(500)
}

val startTs = System.currentTimeMillis()

val eventsBatch = eventReader.read(streamBatchSize)

if (eventsBatch.isEmpty()) {
Expand All @@ -64,31 +62,27 @@ class BufferedAggregateEventStream<A : Aggregate>(
}

eventsBatch.forEach { eventRecord ->
logger.trace("Processing event from batch: $eventRecord.")
logger.trace("Processing event from batch: {}.", eventRecord)

feedToHandling(eventRecord) {
eventStreamNotifier.onRecordHandledSuccessfully(streamName, eventRecord.eventTitle)
eventReader.acknowledgeRecord(eventRecord)
}
}
val executionTime = System.currentTimeMillis() - startTs
if (executionTime < streamReadPeriod) {
delay(streamReadPeriod - executionTime)
}
}
}.also {
it.invokeOnCompletion(eventStreamCompletionHandler)
}

override suspend fun handleNextRecord(eventProcessingFunction: suspend (EventRecord) -> Boolean) {
val receivedRecord = eventsChannel.receiveEvent()
logger.trace("Event $receivedRecord was received for handling")
logger.trace("Event {} was received for handling", receivedRecord)

try {
eventProcessingFunction(receivedRecord).also {
if (!it) logger.info("Processing function return false for event record: $receivedRecord")

logger.trace("Sending confirmation on receiving event $receivedRecord")
logger.trace("Sending confirmation on receiving event {}", receivedRecord)
eventsChannel.sendConfirmation(isConfirmed = it)
}
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ class EventStreamSubscriber<A : Aggregate>(
private val logger: Logger = LoggerFactory.getLogger(EventStreamSubscriber::class.java)

private val subscriptionCoroutine: Job = CoroutineScope(
CoroutineName("handlingCoroutine") + Executors.newSingleThreadExecutor()
.asCoroutineDispatcher() // todo sukhoa customize
CoroutineName("handlingCoroutine") + Executors.newSingleThreadExecutor().asCoroutineDispatcher()
).launch {
while (active) {
aggregateEventStream.handleNextRecord { eventRecord ->
try {
val event = payloadToEvent(eventRecord.payload, eventRecord.eventTitle)
logger.trace("Event record $eventRecord was converted to event $event")

val eventHandler = handlers[event::class]
eventHandler?.invoke(event)
val eventType = nameToEventClassFunc(eventRecord.eventTitle)
handlers[eventType]?.let { handler ->
val event = payloadToEvent(eventRecord.payload, eventType)
logger.trace("Event record {} was converted to event {}", eventRecord, event)
handler.invoke(event)
}
true
} catch (e: Exception) {
logger.error("Unexpected exception while handling event in subscriber. Stream: ${aggregateEventStream.streamName}, event record: $eventRecord", e)
Expand All @@ -65,9 +65,9 @@ class EventStreamSubscriber<A : Aggregate>(
}
}

private fun payloadToEvent(payload: String, eventTitle: String): Event<A> = eventMapper.toEvent(
private fun payloadToEvent(payload: String, eventType: KClass<Event<A>>): Event<A> = eventMapper.toEvent(
payload,
nameToEventClassFunc(eventTitle)
eventType
)

/**
Expand Down
2 changes: 1 addition & 1 deletion tiny-event-sourcing-sagas-projections/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<name>tiny-event-sourcing-sagas-projections</name>
Expand Down
2 changes: 1 addition & 1 deletion tiny-event-sourcing-spring-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<name>tiny-event-sourcing-spring-app</name>
Expand Down
4 changes: 2 additions & 2 deletions tiny-event-sourcing-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<name>tiny-event-sourcing-spring-boot-starter</name>
<description>Tiny event sourcing spring boot starter</description>
<artifactId>tiny-event-sourcing-spring-boot-starter</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class EventSourcingLibConfig {
fun sagaEventStream(
aggregateRegistry: AggregateRegistry,
eventStreamManager: AggregateEventStreamManager,
sagaStepEsService: EventSourcingService<UUID, SagaStepAggregate, SagaStepAggregateState>
) = SagaEventStream(aggregateRegistry, eventStreamManager, sagaStepEsService)
sagaStepEsService: EventSourcingService<UUID, SagaStepAggregate, SagaStepAggregateState>,
configProperties: EventSourcingProperties,
) = SagaEventStream(aggregateRegistry, eventStreamManager, sagaStepEsService, configProperties)
}
2 changes: 1 addition & 1 deletion tiny-mongo-event-store-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<name>tiny-mongo-event-store-spring-boot-starter</name>
Expand Down
2 changes: 1 addition & 1 deletion tiny-mongo-event-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<name>tiny-mongo-event-store</name>
Expand Down
2 changes: 1 addition & 1 deletion tiny-postgres-event-store-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
--changeset vekajp:es-001

create sequence if not exists ${schema}.event_record_created_at_sequence;

create table if not exists ${schema}.event_record
(
id text primary key,
Expand All @@ -14,6 +15,10 @@ create table if not exists ${schema}.event_record
created_at bigint default nextval('${schema}.event_record_created_at_sequence'),
unique (aggregate_id, aggregate_version)
);

CREATE INDEX idx_created_at ON event_sourcing_store.event_record(aggregate_table_name, created_at);
CREATE INDEX idx_aggregate_id_version ON event_sourcing_store.event_record(aggregate_id, aggregate_version);

create table if not exists ${schema}.snapshot
(
id text primary key,
Expand Down
2 changes: 1 addition & 1 deletion tiny-postgresql-event-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ru.quipy</groupId>
<artifactId>tiny-event-sourcing</artifactId>
<version>2.6.2</version>
<version>2.6.3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down

0 comments on commit cd1adac

Please sign in to comment.