diff --git a/.idea/.gitignore b/.idea/.gitignore index 13566b81..a9d7db9c 100644 --- a/.idea/.gitignore +++ b/.idea/.gitignore @@ -6,3 +6,5 @@ # Datasource local storage ignored files /dataSources/ /dataSources.local.xml +# GitHub Copilot persisted chat sessions +/copilot/chatSessions diff --git a/pom.xml b/pom.xml index 0308cc99..f2ebd556 100644 --- a/pom.xml +++ b/pom.xml @@ -42,7 +42,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 11 diff --git a/tiny-event-sourcing-lib/pom.xml b/tiny-event-sourcing-lib/pom.xml index ed8badef..36a47ea7 100644 --- a/tiny-event-sourcing-lib/pom.xml +++ b/tiny-event-sourcing-lib/pom.xml @@ -7,7 +7,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 tiny-event-sourcing-lib diff --git a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/core/EventSourcingProperties.kt b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/core/EventSourcingProperties.kt index 4bb5baf0..312786d4 100644 --- a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/core/EventSourcingProperties.kt +++ b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/core/EventSourcingProperties.kt @@ -13,8 +13,8 @@ class EventSourcingProperties ( var autoScanEnabled: Boolean = false, var scanPackage: String? = null, var spinLockMaxAttempts: Int = 25, - var maxActiveReaderInactivityPeriod: Duration = 5.minutes, + var maxActiveReaderInactivityPeriod: Duration = 5.seconds, var recordReadIndexCommitPeriod: Int = 10, - val eventReaderHealthCheckPeriod: Duration = 5.seconds, + val eventReaderHealthCheckPeriod: Duration = 3.seconds, var sagasEnabled: Boolean = true, ) diff --git a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/saga/aggregate/stream/SagaEventStream.kt b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/saga/aggregate/stream/SagaEventStream.kt index 01991f85..0c4a9a3e 100644 --- a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/saga/aggregate/stream/SagaEventStream.kt +++ b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/saga/aggregate/stream/SagaEventStream.kt @@ -3,6 +3,7 @@ package ru.quipy.saga.aggregate.stream import kotlinx.coroutines.* import org.slf4j.LoggerFactory import ru.quipy.core.AggregateRegistry +import ru.quipy.core.EventSourcingProperties import ru.quipy.core.EventSourcingService import ru.quipy.domain.Aggregate import ru.quipy.saga.SagaContext @@ -11,6 +12,8 @@ import ru.quipy.saga.SagaStep import ru.quipy.saga.aggregate.api.SagaStepAggregate import ru.quipy.saga.aggregate.logic.SagaStepAggregateState import ru.quipy.streams.* +import ru.quipy.streams.annotation.RetryConf +import ru.quipy.streams.annotation.RetryFailedStrategy import java.util.* import java.util.concurrent.Executors @@ -23,7 +26,8 @@ import java.util.concurrent.Executors class SagaEventStream( private val aggregateRegistry: AggregateRegistry, private val eventsStreamManager: AggregateEventStreamManager, - private val sagaStepEsService: EventSourcingService + private val sagaStepEsService: EventSourcingService, + private val props: EventSourcingProperties, ) { @Volatile private var active = true @@ -31,12 +35,17 @@ class SagaEventStream( private val logger = LoggerFactory.getLogger(SagaEventStream::class.java) fun init() { + if (props.sagasEnabled.not()) { + return + } + val aggregates = aggregateRegistry.getAllAggregates() // todo sukhoa aggregates.filter { it != SagaStepAggregate::class } .forEach { val streamName = "saga::" + it.simpleName - val aggregateStream = eventsStreamManager.createEventStream(streamName, it) + val aggregateStream = eventsStreamManager.createEventStream( + streamName, it, RetryConf(0, RetryFailedStrategy.SKIP_EVENT)) launchSagaEventStream(streamName, aggregateStream) } diff --git a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/AggregateEventStreamManager.kt b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/AggregateEventStreamManager.kt index a97fddc4..2722d2cf 100644 --- a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/AggregateEventStreamManager.kt +++ b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/AggregateEventStreamManager.kt @@ -20,9 +20,6 @@ class AggregateEventStreamManager( ) { private val eventStreamListener: EventStreamListenerImpl = EventStreamListenerImpl()// todo sukhoa make injectable - private val eventStoreReaderDispatcher = Executors.newFixedThreadPool(16).asCoroutineDispatcher() - private val eventStreamsDispatcher = Executors.newFixedThreadPool(32).asCoroutineDispatcher() // todo sukhoa fix - private val eventStreams = ConcurrentHashMap>() fun createEventStream( @@ -39,7 +36,7 @@ class AggregateEventStreamManager( aggregateInfo as AggregateRegistry.BasicAggregateInfo, eventSourcingProperties, eventStreamListener, - eventStoreReaderDispatcher + Executors.newFixedThreadPool(1).asCoroutineDispatcher() // eventStoreReaderDispatcher ) val eventsChannel = EventsChannel() @@ -53,7 +50,7 @@ class AggregateEventStreamManager( eventStoreReader, retryConfig, eventStreamListener, - eventStreamsDispatcher + Executors.newFixedThreadPool(1).asCoroutineDispatcher() // eventStreamsDispatcher ) ) diff --git a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/BufferedAggregateEventStream.kt b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/BufferedAggregateEventStream.kt index 33c409ea..bc618cca 100644 --- a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/BufferedAggregateEventStream.kt +++ b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/BufferedAggregateEventStream.kt @@ -54,8 +54,6 @@ class BufferedAggregateEventStream( delay(500) } - val startTs = System.currentTimeMillis() - val eventsBatch = eventReader.read(streamBatchSize) if (eventsBatch.isEmpty()) { @@ -64,17 +62,13 @@ class BufferedAggregateEventStream( } eventsBatch.forEach { eventRecord -> - logger.trace("Processing event from batch: $eventRecord.") + logger.trace("Processing event from batch: {}.", eventRecord) feedToHandling(eventRecord) { eventStreamNotifier.onRecordHandledSuccessfully(streamName, eventRecord.eventTitle) eventReader.acknowledgeRecord(eventRecord) } } - val executionTime = System.currentTimeMillis() - startTs - if (executionTime < streamReadPeriod) { - delay(streamReadPeriod - executionTime) - } } }.also { it.invokeOnCompletion(eventStreamCompletionHandler) @@ -82,13 +76,13 @@ class BufferedAggregateEventStream( override suspend fun handleNextRecord(eventProcessingFunction: suspend (EventRecord) -> Boolean) { val receivedRecord = eventsChannel.receiveEvent() - logger.trace("Event $receivedRecord was received for handling") + logger.trace("Event {} was received for handling", receivedRecord) try { eventProcessingFunction(receivedRecord).also { if (!it) logger.info("Processing function return false for event record: $receivedRecord") - logger.trace("Sending confirmation on receiving event $receivedRecord") + logger.trace("Sending confirmation on receiving event {}", receivedRecord) eventsChannel.sendConfirmation(isConfirmed = it) } } catch (e: Exception) { diff --git a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/EventStreamSubscriber.kt b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/EventStreamSubscriber.kt index 7a913900..57d4378b 100644 --- a/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/EventStreamSubscriber.kt +++ b/tiny-event-sourcing-lib/src/main/kotlin/ru/quipy/streams/EventStreamSubscriber.kt @@ -45,17 +45,17 @@ class EventStreamSubscriber( private val logger: Logger = LoggerFactory.getLogger(EventStreamSubscriber::class.java) private val subscriptionCoroutine: Job = CoroutineScope( - CoroutineName("handlingCoroutine") + Executors.newSingleThreadExecutor() - .asCoroutineDispatcher() // todo sukhoa customize + CoroutineName("handlingCoroutine") + Executors.newSingleThreadExecutor().asCoroutineDispatcher() ).launch { while (active) { aggregateEventStream.handleNextRecord { eventRecord -> try { - val event = payloadToEvent(eventRecord.payload, eventRecord.eventTitle) - logger.trace("Event record $eventRecord was converted to event $event") - - val eventHandler = handlers[event::class] - eventHandler?.invoke(event) + val eventType = nameToEventClassFunc(eventRecord.eventTitle) + handlers[eventType]?.let { handler -> + val event = payloadToEvent(eventRecord.payload, eventType) + logger.trace("Event record {} was converted to event {}", eventRecord, event) + handler.invoke(event) + } true } catch (e: Exception) { logger.error("Unexpected exception while handling event in subscriber. Stream: ${aggregateEventStream.streamName}, event record: $eventRecord", e) @@ -65,9 +65,9 @@ class EventStreamSubscriber( } } - private fun payloadToEvent(payload: String, eventTitle: String): Event = eventMapper.toEvent( + private fun payloadToEvent(payload: String, eventType: KClass>): Event = eventMapper.toEvent( payload, - nameToEventClassFunc(eventTitle) + eventType ) /** diff --git a/tiny-event-sourcing-sagas-projections/pom.xml b/tiny-event-sourcing-sagas-projections/pom.xml index c17e06f6..495a937f 100644 --- a/tiny-event-sourcing-sagas-projections/pom.xml +++ b/tiny-event-sourcing-sagas-projections/pom.xml @@ -8,7 +8,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 tiny-event-sourcing-sagas-projections diff --git a/tiny-event-sourcing-spring-app/pom.xml b/tiny-event-sourcing-spring-app/pom.xml index db0456a7..d2ef29d2 100644 --- a/tiny-event-sourcing-spring-app/pom.xml +++ b/tiny-event-sourcing-spring-app/pom.xml @@ -8,7 +8,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 tiny-event-sourcing-spring-app diff --git a/tiny-event-sourcing-spring-boot-starter/pom.xml b/tiny-event-sourcing-spring-boot-starter/pom.xml index 58652f12..4e227064 100644 --- a/tiny-event-sourcing-spring-boot-starter/pom.xml +++ b/tiny-event-sourcing-spring-boot-starter/pom.xml @@ -7,13 +7,13 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 tiny-event-sourcing-spring-boot-starter Tiny event sourcing spring boot starter tiny-event-sourcing-spring-boot-starter - 2.6.2 + 2.6.3 17 diff --git a/tiny-event-sourcing-spring-boot-starter/src/main/kotlin/ru/quipy/config/EventSourcingLibConfig.kt b/tiny-event-sourcing-spring-boot-starter/src/main/kotlin/ru/quipy/config/EventSourcingLibConfig.kt index 48c33eb8..35acac25 100644 --- a/tiny-event-sourcing-spring-boot-starter/src/main/kotlin/ru/quipy/config/EventSourcingLibConfig.kt +++ b/tiny-event-sourcing-spring-boot-starter/src/main/kotlin/ru/quipy/config/EventSourcingLibConfig.kt @@ -138,6 +138,7 @@ class EventSourcingLibConfig { fun sagaEventStream( aggregateRegistry: AggregateRegistry, eventStreamManager: AggregateEventStreamManager, - sagaStepEsService: EventSourcingService - ) = SagaEventStream(aggregateRegistry, eventStreamManager, sagaStepEsService) + sagaStepEsService: EventSourcingService, + configProperties: EventSourcingProperties, + ) = SagaEventStream(aggregateRegistry, eventStreamManager, sagaStepEsService, configProperties) } \ No newline at end of file diff --git a/tiny-mongo-event-store-spring-boot-starter/pom.xml b/tiny-mongo-event-store-spring-boot-starter/pom.xml index 094d6737..5b6198e8 100644 --- a/tiny-mongo-event-store-spring-boot-starter/pom.xml +++ b/tiny-mongo-event-store-spring-boot-starter/pom.xml @@ -7,7 +7,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 tiny-mongo-event-store-spring-boot-starter diff --git a/tiny-mongo-event-store/pom.xml b/tiny-mongo-event-store/pom.xml index df4abe3e..ae7f0cfc 100644 --- a/tiny-mongo-event-store/pom.xml +++ b/tiny-mongo-event-store/pom.xml @@ -8,7 +8,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 tiny-mongo-event-store diff --git a/tiny-postgres-event-store-spring-boot-starter/pom.xml b/tiny-postgres-event-store-spring-boot-starter/pom.xml index a6a18c79..a24cec0b 100644 --- a/tiny-postgres-event-store-spring-boot-starter/pom.xml +++ b/tiny-postgres-event-store-spring-boot-starter/pom.xml @@ -6,7 +6,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 4.0.0 diff --git a/tiny-postgres-event-store-spring-boot-starter/src/main/resources/liquibase/changelog.sql b/tiny-postgres-event-store-spring-boot-starter/src/main/resources/liquibase/changelog.sql index 63ea4e96..f03a4224 100644 --- a/tiny-postgres-event-store-spring-boot-starter/src/main/resources/liquibase/changelog.sql +++ b/tiny-postgres-event-store-spring-boot-starter/src/main/resources/liquibase/changelog.sql @@ -2,6 +2,7 @@ --changeset vekajp:es-001 create sequence if not exists ${schema}.event_record_created_at_sequence; + create table if not exists ${schema}.event_record ( id text primary key, @@ -14,6 +15,10 @@ create table if not exists ${schema}.event_record created_at bigint default nextval('${schema}.event_record_created_at_sequence'), unique (aggregate_id, aggregate_version) ); + +CREATE INDEX idx_created_at ON event_sourcing_store.event_record(aggregate_table_name, created_at); +CREATE INDEX idx_aggregate_id_version ON event_sourcing_store.event_record(aggregate_id, aggregate_version); + create table if not exists ${schema}.snapshot ( id text primary key, diff --git a/tiny-postgresql-event-store/pom.xml b/tiny-postgresql-event-store/pom.xml index 2bf7a408..d8b0f651 100644 --- a/tiny-postgresql-event-store/pom.xml +++ b/tiny-postgresql-event-store/pom.xml @@ -5,7 +5,7 @@ ru.quipy tiny-event-sourcing - 2.6.2 + 2.6.3 4.0.0