@@ -15,6 +15,7 @@ import Control.Concurrent.Class.MonadSTM (
15
15
labelTVarIO ,
16
16
newTVarIO ,
17
17
stateTVar ,
18
+ writeTVar ,
18
19
)
19
20
import Control.Monad.Trans.Writer (execWriter , tell )
20
21
import Hydra.API.ClientInput (ClientInput )
@@ -33,7 +34,7 @@ import Hydra.Chain.Direct.Tx (verificationKeyToOnChainId)
33
34
import Hydra.Chain.Direct.Util (readFileTextEnvelopeThrow )
34
35
import Hydra.Crypto (AsType (AsHydraKey ))
35
36
import Hydra.Environment (Environment (.. ))
36
- import Hydra.Events (EventSink (.. ), EventSource (.. ), StateEvent (.. ), putEventToSinks , putEventsToSinks , stateChanged )
37
+ import Hydra.Events (EventId , EventSink (.. ), EventSource (.. ), StateEvent (.. ), getEventId , putEventToSinks , putEventsToSinks , stateChanged )
37
38
import Hydra.HeadLogic (
38
39
Effect (.. ),
39
40
HeadState (.. ),
@@ -179,6 +180,7 @@ hydrate ::
179
180
m (WetHydraNode tx m )
180
181
hydrate eventSource eventSinks dryNode = do
181
182
events <- getEvents eventSource
183
+ let lastSeenEventId = getEventId . last <$> nonEmpty events
182
184
traceWith tracer LoadedState {numberOfEvents = fromIntegral $ length events}
183
185
let headState = recoverState initialState (stateChanged <$> events)
184
186
chainStateHistory = recoverChainStateHistory initialChainState (stateChanged <$> events)
@@ -187,7 +189,7 @@ hydrate eventSource eventSinks dryNode = do
187
189
-- deliver to sinks per spec, deduplication is handled by the sinks
188
190
-- FIXME(Elaine): persistence currently not handling duplication, so this relies on not providing the eventSource's sink as an arg here
189
191
putEventsToSinks eventSinks events
190
- nodeState <- createNodeState headState
192
+ nodeState <- createNodeState lastSeenEventId headState
191
193
inputQueue <- createInputQueue
192
194
pure
193
195
WetHydraNode
@@ -312,13 +314,17 @@ processNextInput HydraNode{nodeState, ledger, env} e =
312
314
313
315
computeOutcome = HeadLogic. update env ledger
314
316
315
- processStateChanges :: Monad m => HydraNode tx m -> [StateChanged tx ] -> m ()
317
+ processStateChanges :: MonadSTM m => HydraNode tx m -> [StateChanged tx ] -> m ()
316
318
processStateChanges node stateChanges = do
317
- -- TODO: obviously don't do this
318
- let events = zipWith (\ eventId stateChanged -> StateEvent {eventId, stateChanged}) [0 .. ] stateChanges
319
+ events <- atomically . forM stateChanges $ \ stateChanged -> do
320
+ eventId <- getNextEventId
321
+ pure StateEvent {eventId, stateChanged}
319
322
forM_ events $ putEventToSinks eventSinks
320
323
where
321
- HydraNode {eventSinks} = node
324
+ HydraNode
325
+ { eventSinks
326
+ , nodeState = NodeState {getNextEventId}
327
+ } = node
322
328
323
329
processEffects ::
324
330
( MonadAsync m
@@ -358,17 +364,29 @@ processEffects node tracer inputId effects = do
358
364
data NodeState tx m = NodeState
359
365
{ modifyHeadState :: forall a . (HeadState tx -> (a , HeadState tx )) -> STM m a
360
366
, queryHeadState :: STM m (HeadState tx )
367
+ , getNextEventId :: STM m EventId
361
368
}
362
369
363
370
-- | Initialize a new 'NodeState'.
364
- createNodeState :: MonadLabelledSTM m => HeadState tx -> m (NodeState tx m )
365
- createNodeState initialState = do
366
- tv <- newTVarIO initialState
367
- labelTVarIO tv " node-state"
371
+ createNodeState ::
372
+ MonadLabelledSTM m =>
373
+ -- | Last seen 'EventId'.
374
+ Maybe EventId ->
375
+ HeadState tx ->
376
+ m (NodeState tx m )
377
+ createNodeState lastSeenEventId initialState = do
378
+ nextEventIdV <- newTVarIO $ maybe 0 (+ 1 ) lastSeenEventId
379
+ labelTVarIO nextEventIdV " next-event-id"
380
+ hs <- newTVarIO initialState
381
+ labelTVarIO hs " head-state"
368
382
pure
369
383
NodeState
370
- { modifyHeadState = stateTVar tv
371
- , queryHeadState = readTVar tv
384
+ { modifyHeadState = stateTVar hs
385
+ , queryHeadState = readTVar hs
386
+ , getNextEventId = do
387
+ eventId <- readTVar nextEventIdV
388
+ writeTVar nextEventIdV $ eventId + 1
389
+ pure eventId
372
390
}
373
391
374
392
-- * Logging
0 commit comments