Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 255f843

Browse files
committedMar 13, 2024··
Move eventPairFromPersistenceIncremental to Hydra.Events.FileBased
This implementation and its tests will serve as an example for an EventSource and EventSink.
1 parent 3ad6cba commit 255f843

File tree

10 files changed

+188
-146
lines changed

10 files changed

+188
-146
lines changed
 

‎hydra-node/hydra-node.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ library
6969
Hydra.Crypto
7070
Hydra.Environment
7171
Hydra.Events
72+
Hydra.Events.FileBased
7273
Hydra.HeadId
7374
Hydra.HeadLogic
7475
Hydra.HeadLogic.Error
@@ -292,6 +293,7 @@ test-suite tests
292293
Hydra.Chain.Direct.WalletSpec
293294
Hydra.ContestationPeriodSpec
294295
Hydra.CryptoSpec
296+
Hydra.Events.FileBasedSpec
295297
Hydra.FireForgetSpec
296298
Hydra.HeadLogicSnapshotSpec
297299
Hydra.HeadLogicSpec

‎hydra-node/src/Hydra/Events.hs

+12-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
-- A single 'EventSource' and zero or more 'EventSink' handles are used by the
55
-- main 'HydraNode' handle to load and send out events.
66
--
7-
-- TODO: add an example event source sink (on top of the persistence one)
7+
-- See 'Hydra.Events.FileBased' for an example implementation and
8+
-- 'Hydra.Events.FileBasedSpec' for the corresponding test suite.
9+
--
10+
-- Custom implementations should be located under Hydra.Events to avoid
11+
-- conflicts.
812
module Hydra.Events where
913

1014
import Hydra.Prelude
@@ -30,16 +34,17 @@ newtype EventSink e m = EventSink
3034
-- ^ Send a single event to the event sink.
3135
}
3236

33-
putEventToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> e -> m ()
34-
putEventToSinks sinks e = forM_ sinks $ \sink -> putEvent sink e
35-
37+
-- | Put a list of events to a list of event sinks in a round-robin fashion.
3638
putEventsToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> [e] -> m ()
37-
putEventsToSinks sinks = mapM_ (putEventToSinks sinks)
39+
putEventsToSinks sinks events =
40+
forM_ events $ \event ->
41+
forM_ sinks $ \sink ->
42+
putEvent sink event
3843

3944
-- * State change events as used by Hydra.Node
4045

41-
-- TODO: Move 'StateChanged' here as well?
42-
46+
-- | A state change event with an event id that is the common entity to be
47+
-- loaded from an 'EventSource' and sent to 'EventSink's.
4348
data StateEvent tx = StateEvent
4449
{ eventId :: EventId
4550
, stateChanged :: StateChanged tx
+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
-- | A file-based event source and sink using JSON encoding.
2+
--
3+
-- This serves as an example of how to create an 'EventSource' and 'EventSink'.
4+
module Hydra.Events.FileBased where
5+
6+
import Hydra.Prelude
7+
8+
import Hydra.Chain (IsChainState)
9+
import Hydra.Persistence (PersistenceIncremental (..))
10+
import Hydra.Events (EventSource (..), StateEvent (..), EventSink (..))
11+
import Hydra.HeadLogic.Outcome (StateChanged)
12+
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
13+
14+
-- | A basic file based event source and sink defined using an
15+
-- 'PersistenceIncremental' handle.
16+
--
17+
-- The complexity in this implementation mostly stems from the fact that we want
18+
-- to be backward-compatible with the old, plain format of storing
19+
-- 'StateChanged' items directly to disk using 'PersistenceIncremental'.
20+
--
21+
-- If any 'Legacy StateChanged' items are discovered, a running index is used
22+
-- for the 'eventId', while the 'New StateEvent' values are just stored as is.
23+
--
24+
-- A new implementation for an 'EventSource' with a compatible 'EventSink' could
25+
-- be defined more generically with constraints:
26+
--
27+
-- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
28+
eventPairFromPersistenceIncremental ::
29+
(IsChainState tx, MonadSTM m) =>
30+
PersistenceIncremental (PersistedStateChange tx) m ->
31+
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
32+
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
33+
eventIdV <- newTVarIO Nothing
34+
let
35+
getLastSeenEventId = readTVar eventIdV
36+
37+
setLastSeenEventId StateEvent{eventId} = do
38+
writeTVar eventIdV (Just eventId)
39+
40+
getNextEventId =
41+
maybe 0 (+ 1) <$> readTVar eventIdV
42+
43+
-- Keep track of the last seen event id when loading
44+
getEvents = do
45+
items <- loadAll
46+
atomically . forM items $ \i -> do
47+
event <- case i of
48+
New e -> pure e
49+
Legacy sc -> do
50+
eventId <- getNextEventId
51+
pure $ StateEvent eventId sc
52+
53+
setLastSeenEventId event
54+
pure event
55+
56+
-- Filter events that are already stored
57+
putEvent e@StateEvent{eventId} = do
58+
atomically getLastSeenEventId >>= \case
59+
Nothing -> store e
60+
Just lastSeenEventId
61+
| eventId > lastSeenEventId -> store e
62+
| otherwise -> pure ()
63+
64+
store e = do
65+
append (New e)
66+
atomically $ setLastSeenEventId e
67+
68+
pure (EventSource{getEvents}, EventSink{putEvent})
69+
70+
-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
71+
-- compatible with plain usage of 'PersistenceIncrementa' using plain
72+
-- 'StateChanged' items to the new 'StateEvent' persisted items.
73+
data PersistedStateChange tx
74+
= Legacy (StateChanged tx)
75+
| New (StateEvent tx)
76+
deriving stock (Generic, Show, Eq)
77+
78+
instance IsChainState tx => ToJSON (PersistedStateChange tx) where
79+
toJSON = \case
80+
Legacy sc -> toJSON sc
81+
New e -> toJSON e
82+
83+
instance IsChainState tx => FromJSON (PersistedStateChange tx) where
84+
parseJSON v =
85+
New <$> parseJSON v
86+
<|> Legacy <$> parseJSON v

‎hydra-node/src/Hydra/Node.hs

+3-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import Hydra.Chain.Direct.Tx (verificationKeyToOnChainId)
3434
import Hydra.Chain.Direct.Util (readFileTextEnvelopeThrow)
3535
import Hydra.Crypto (AsType (AsHydraKey))
3636
import Hydra.Environment (Environment (..))
37-
import Hydra.Events (EventId, EventSink (..), EventSource (..), StateEvent (..), getEventId, putEventToSinks, putEventsToSinks, stateChanged)
37+
import Hydra.Events (EventId, EventSink (..), EventSource (..), StateEvent (..), getEventId, putEventsToSinks, stateChanged)
3838
import Hydra.HeadLogic (
3939
Effect (..),
4040
HeadState (..),
@@ -171,8 +171,7 @@ hydrate tracer env ledger initialChainState eventSource eventSinks = do
171171
chainStateHistory = recoverChainStateHistory initialChainState (stateChanged <$> events)
172172
-- Check whether the loaded state matches our configuration (env)
173173
checkHeadState tracer env headState
174-
-- deliver to sinks per spec, deduplication is handled by the sinks
175-
-- FIXME(Elaine): persistence currently not handling duplication, so this relies on not providing the eventSource's sink as an arg here
174+
-- (Re-)submit events to sinks; de-duplication is handled by the sinks
176175
putEventsToSinks eventSinks events
177176
nodeState <- createNodeState lastSeenEventId headState
178177
inputQueue <- createInputQueue
@@ -302,7 +301,7 @@ processStateChanges node stateChanges = do
302301
events <- atomically . forM stateChanges $ \stateChanged -> do
303302
eventId <- getNextEventId
304303
pure StateEvent{eventId, stateChanged}
305-
forM_ events $ putEventToSinks eventSinks
304+
putEventsToSinks eventSinks events
306305
where
307306
HydraNode
308307
{ eventSinks

‎hydra-node/src/Hydra/Node/Run.hs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import Hydra.Chain.Direct (loadChainContext, mkTinyWallet, withDirectChain)
1313
import Hydra.Chain.Direct.State (initialChainState)
1414
import Hydra.Chain.Offline (loadGenesisFile, withOfflineChain)
1515
import Hydra.Environment (Environment (..))
16+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
1617
import Hydra.Ledger.Cardano qualified as Ledger
1718
import Hydra.Ledger.Cardano.Configuration (
1819
Globals,
@@ -45,7 +46,7 @@ import Hydra.Options (
4546
RunOptions (..),
4647
validateRunOptions,
4748
)
48-
import Hydra.Persistence (createPersistenceIncremental, eventPairFromPersistenceIncremental)
49+
import Hydra.Persistence (createPersistenceIncremental)
4950

5051
data ConfigurationException
5152
= ConfigurationException ProtocolParametersConversionError

‎hydra-node/src/Hydra/Persistence.hs

+1-65
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@ module Hydra.Persistence where
44

55
import Hydra.Prelude
66

7-
import Control.Concurrent.Class.MonadSTM (newTVarIO, readTVarIO, swapTVar, throwSTM, writeTVar)
7+
import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar)
88
import Control.Monad.Class.MonadFork (myThreadId)
99
import Data.Aeson qualified as Aeson
1010
import Data.ByteString qualified as BS
1111
import Data.ByteString.Char8 qualified as C8
12-
import Hydra.Chain (IsChainState)
13-
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
14-
import Hydra.HeadLogic (StateChanged)
1512
import System.Directory (createDirectoryIfMissing, doesFileExist)
1613
import System.FilePath (takeDirectory)
1714
import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic)
@@ -97,64 +94,3 @@ createPersistenceIncremental fp = do
9794
Left e -> throwIO $ PersistenceException e
9895
Right decoded -> pure decoded
9996
}
100-
101-
-- * Event Source / Sink interface
102-
103-
-- TODO: document
104-
data PersistedStateChange tx
105-
= Legacy (StateChanged tx)
106-
| New (StateEvent tx)
107-
deriving stock (Generic, Show, Eq)
108-
109-
instance IsChainState tx => ToJSON (PersistedStateChange tx) where
110-
toJSON = \case
111-
Legacy sc -> toJSON sc
112-
New e -> toJSON e
113-
114-
instance IsChainState tx => FromJSON (PersistedStateChange tx) where
115-
parseJSON v =
116-
New <$> parseJSON v
117-
<|> Legacy <$> parseJSON v
118-
119-
-- | Define an event source and sink from a persistence handle.
120-
eventPairFromPersistenceIncremental ::
121-
(IsChainState tx, MonadSTM m) =>
122-
PersistenceIncremental (PersistedStateChange tx) m ->
123-
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
124-
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
125-
eventIdV <- newTVarIO Nothing
126-
let
127-
getLastSeenEventId = readTVar eventIdV
128-
129-
setLastSeenEventId StateEvent{eventId} = do
130-
writeTVar eventIdV (Just eventId)
131-
132-
getNextEventId =
133-
maybe 0 (+ 1) <$> readTVar eventIdV
134-
135-
-- Keep track of the last seen event id when loading
136-
getEvents = do
137-
items <- loadAll
138-
atomically . forM items $ \i -> do
139-
event <- case i of
140-
New e -> pure e
141-
Legacy sc -> do
142-
eventId <- getNextEventId
143-
pure $ StateEvent eventId sc
144-
145-
setLastSeenEventId event
146-
pure event
147-
148-
-- Filter events that are already stored
149-
putEvent e@StateEvent{eventId} = do
150-
atomically getLastSeenEventId >>= \case
151-
Nothing -> store e
152-
Just lastSeenEventId
153-
| eventId > lastSeenEventId -> store e
154-
| otherwise -> pure ()
155-
156-
store e = do
157-
append (New e)
158-
atomically $ setLastSeenEventId e
159-
160-
pure (EventSource{getEvents}, EventSink{putEvent})

‎hydra-node/test/Hydra/BehaviorSpec.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import Hydra.Chain.Direct.State (ChainStateAt (..))
3939
import Hydra.ContestationPeriod (ContestationPeriod (UnsafeContestationPeriod), toNominalDiffTime)
4040
import Hydra.Crypto (HydraKey, aggregate, sign)
4141
import Hydra.Environment (Environment (..))
42+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
4243
import Hydra.HeadLogic (
4344
Effect (..),
4445
HeadState (..),
@@ -64,7 +65,6 @@ import Hydra.Node (
6465
import Hydra.Node.InputQueue (InputQueue (enqueue), createInputQueue)
6566
import Hydra.NodeSpec (createPersistenceInMemory)
6667
import Hydra.Party (Party (..), deriveParty)
67-
import Hydra.Persistence (eventPairFromPersistenceIncremental)
6868
import Hydra.Snapshot (Snapshot (..), SnapshotNumber, getSnapshot)
6969
import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, deriveOnChainId, testHeadId, testHeadSeed)
7070
import Test.Util (shouldBe, shouldNotBe, shouldRunInSim, traceInIOSim)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
-- | Tests for the 'EventSource' and 'EventSink' implementation in 'Hydra.Events.FileBased'.
2+
module Hydra.Events.FileBasedSpec where
3+
4+
import Hydra.Prelude hiding (label)
5+
import Test.Hydra.Prelude
6+
7+
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..), getEvents, putEvent)
8+
import Hydra.Ledger.Simple (SimpleTx)
9+
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncremental)
10+
import Test.QuickCheck (forAllShrink, ioProperty, sublistOf, (===))
11+
import Test.QuickCheck.Gen (listOf)
12+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
13+
14+
spec :: Spec
15+
spec = do
16+
describe "eventPairFromPersistenceIncremental" $ do
17+
prop "can handle continuous events" $
18+
forAllShrink genContinuousEvents shrink $ \events ->
19+
ioProperty $ do
20+
withEventSourceAndSink $ \EventSource{getEvents} EventSink{putEvent} -> do
21+
forM_ events putEvent
22+
loadedEvents <- getEvents
23+
pure $
24+
loadedEvents === events
25+
26+
prop "can handle non-continuous events" $
27+
forAllShrink (sublistOf =<< genContinuousEvents) shrink $ \events ->
28+
ioProperty $ do
29+
withEventSourceAndSink $ \EventSource{getEvents} EventSink{putEvent} -> do
30+
forM_ events putEvent
31+
loadedEvents <- getEvents
32+
pure $
33+
loadedEvents === events
34+
35+
prop "can handle duplicate events" $
36+
forAllShrink genContinuousEvents shrink $ \events ->
37+
ioProperty $
38+
withEventSourceAndSink $ \EventSource{getEvents} EventSink{putEvent} -> do
39+
-- Put some events
40+
forM_ events putEvent
41+
loadedEvents <- getEvents
42+
-- Put the loaded events again (as the node would do)
43+
forM_ loadedEvents putEvent
44+
allEvents <- getEvents
45+
pure $
46+
allEvents === loadedEvents
47+
48+
prop "can bootstrap from plain StateChanged events" $
49+
forAllShrink genContinuousEvents shrink $ \events -> do
50+
ioProperty $ do
51+
withTempDir "hydra-persistence" $ \tmpDir -> do
52+
-- Store state changes directly (legacy)
53+
let stateChanges = map stateChanged events
54+
PersistenceIncremental{append} <- createPersistenceIncremental (tmpDir <> "/data")
55+
forM_ stateChanges append
56+
-- Load and store events through the event source interface
57+
(EventSource{getEvents}, EventSink{putEvent}) <-
58+
eventPairFromPersistenceIncremental
59+
=<< createPersistenceIncremental (tmpDir <> "/data")
60+
loadedEvents <- getEvents
61+
-- Store all loaded events like the node would do
62+
forM_ loadedEvents putEvent
63+
pure $
64+
map stateChanged loadedEvents === stateChanges
65+
66+
genContinuousEvents :: Gen [StateEvent SimpleTx]
67+
genContinuousEvents =
68+
zipWith StateEvent [0 ..] <$> listOf arbitrary
69+
70+
withEventSourceAndSink :: (EventSource (StateEvent SimpleTx) IO -> EventSink (StateEvent SimpleTx) IO -> IO b) -> IO b
71+
withEventSourceAndSink action =
72+
withTempDir "hydra-persistence" $ \tmpDir -> do
73+
(eventSource, eventSink) <-
74+
eventPairFromPersistenceIncremental
75+
=<< createPersistenceIncremental (tmpDir <> "/data")
76+
action eventSource eventSink

‎hydra-node/test/Hydra/NodeSpec.hs

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Hydra.Crypto (HydraKey, sign)
1616
import Hydra.Environment (Environment (..))
1717
import Hydra.Environment qualified as Environment
1818
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..), genStateEvent, getEventId)
19+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
1920
import Hydra.HeadLogic (Input (..), defaultTTL)
2021
import Hydra.HeadLogic.Outcome (StateChanged (HeadInitialized), genStateChanged)
2122
import Hydra.HeadLogicSpec (inInitialState, testSnapshot)
@@ -38,7 +39,7 @@ import Hydra.Node.InputQueue (InputQueue (..))
3839
import Hydra.Node.ParameterMismatch (ParameterMismatch (..))
3940
import Hydra.Options (defaultContestationPeriod)
4041
import Hydra.Party (Party, deriveParty)
41-
import Hydra.Persistence (PersistenceIncremental (..), eventPairFromPersistenceIncremental)
42+
import Hydra.Persistence (PersistenceIncremental (..))
4243
import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, carol, carolSk, cperiod, deriveOnChainId, testEnvironment, testHeadId, testHeadSeed)
4344
import Test.QuickCheck (classify, counterexample, elements, forAllBlind, forAllShrink, forAllShrinkBlind, idempotentIOProperty, listOf, listOf1, resize, (==>))
4445
import Test.Util (isStrictlyMonotonic)
@@ -161,7 +162,7 @@ spec = parallel $ do
161162

162163
getServerOutputs >>= (`shouldContain` [TxValid{headId = testHeadId, transaction = tx1}])
163164

164-
-- Ensures that event ids are corecctly hydrate
165+
-- Ensures that event ids are correctly loaded in hydrate
165166
events <- getRecordedEvents
166167
getEventId <$> events `shouldSatisfy` isStrictlyMonotonic
167168

0 commit comments

Comments
 (0)
Please sign in to comment.