@@ -7,28 +7,22 @@ import Hydra.Prelude
7
7
8
8
import Conduit (mapMC , (.|) )
9
9
import Control.Concurrent.Class.MonadSTM (newTVarIO , writeTVar )
10
- import Hydra.Chain.ChainState (IsChainState )
11
- import Hydra.Events (EventSink (.. ), EventSource (.. ), StateEvent (.. ))
10
+ import Hydra.Events ( EventSink (.. ), EventSource (.. ), HasEventId (.. ))
12
11
import Hydra.Persistence (PersistenceIncremental (.. ))
13
12
14
13
-- | A basic file based event source and sink defined using an
15
14
-- 'PersistenceIncremental' handle.
16
- --
17
- -- A new implementation for an 'EventSource' with a compatible 'EventSink' could
18
- -- be defined more generically with constraints:
19
- --
20
- -- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
21
15
eventPairFromPersistenceIncremental ::
22
- (IsChainState tx , MonadSTM m ) =>
23
- PersistenceIncremental ( StateEvent tx ) m ->
24
- m (EventSource ( StateEvent tx ) m , EventSink ( StateEvent tx ) m )
16
+ (ToJSON e , FromJSON e , HasEventId e , MonadSTM m ) =>
17
+ PersistenceIncremental e m ->
18
+ m (EventSource e m , EventSink e m )
25
19
eventPairFromPersistenceIncremental PersistenceIncremental {append, source} = do
26
20
eventIdV <- newTVarIO Nothing
27
21
let
28
22
getLastSeenEventId = readTVar eventIdV
29
23
30
- setLastSeenEventId StateEvent {eventId} = do
31
- writeTVar eventIdV (Just eventId )
24
+ setLastSeenEventId evt = do
25
+ writeTVar eventIdV (Just $ getEventId evt )
32
26
33
27
-- Keep track of the last seen event id when loading
34
28
sourceEvents =
@@ -40,11 +34,11 @@ eventPairFromPersistenceIncremental PersistenceIncremental{append, source} = do
40
34
)
41
35
42
36
-- Filter events that are already stored
43
- putEvent e @ StateEvent {eventId} = do
37
+ putEvent evt = do
44
38
atomically getLastSeenEventId >>= \ case
45
- Nothing -> store e
39
+ Nothing -> store evt
46
40
Just lastSeenEventId
47
- | eventId > lastSeenEventId -> store e
41
+ | getEventId evt > lastSeenEventId -> store evt
48
42
| otherwise -> pure ()
49
43
50
44
store e = do
0 commit comments