Skip to content

Commit 5b3f2a4

Browse files
committed
Move eventPairFromPersistenceIncremental to Hydra.Events.FileBased
This implementation and its tests will serve as an example for an EventSource and EventSink.
1 parent 00d595e commit 5b3f2a4

File tree

9 files changed

+176
-137
lines changed

9 files changed

+176
-137
lines changed

hydra-node/hydra-node.cabal

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ library
6969
Hydra.Crypto
7070
Hydra.Environment
7171
Hydra.Events
72+
Hydra.Events.FileBased
7273
Hydra.HeadId
7374
Hydra.HeadLogic
7475
Hydra.HeadLogic.Error
@@ -292,6 +293,7 @@ test-suite tests
292293
Hydra.Chain.Direct.WalletSpec
293294
Hydra.ContestationPeriodSpec
294295
Hydra.CryptoSpec
296+
Hydra.Events.FileBasedSpec
295297
Hydra.FireForgetSpec
296298
Hydra.HeadLogicSnapshotSpec
297299
Hydra.HeadLogicSpec

hydra-node/src/Hydra/Events.hs

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
-- A single 'EventSource' and zero or more 'EventSink' handles are used by the
55
-- main 'HydraNode' handle to load and send out events.
66
--
7-
-- TODO: add an example event source sink (on top of the persistence one)
7+
-- See 'Hydra.Events.FileBased' for an example implementation and
8+
-- 'Hydra.Events.FileBasedSpec' for the corresponding test suite.
9+
--
10+
-- Custom implementations should be located under Hydra.Events to avoid
11+
-- conflicts.
812
module Hydra.Events where
913

1014
import Hydra.Prelude
+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
-- | A file-based event source and sink using JSON encoding.
2+
--
3+
-- This serves as an example of how to create an 'EventSource' and 'EventSink'.
4+
module Hydra.Events.FileBased where
5+
6+
import Hydra.Prelude
7+
8+
import Hydra.Chain (IsChainState)
9+
import Hydra.Persistence (PersistenceIncremental (..))
10+
import Hydra.Events (EventSource (..), StateEvent (..), EventSink (..))
11+
import Hydra.HeadLogic.Outcome (StateChanged)
12+
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
13+
14+
-- | A basic file based event source and sink defined using an
15+
-- 'PersistenceIncremental' handle.
16+
--
17+
-- The complexity in this implementation mostly stems from the fact that we want
18+
-- to be backward-compatible with the old, plain format of storing
19+
-- 'StateChanged' items directly to disk using 'PersistenceIncremental'.
20+
--
21+
-- If any 'Legacy StateChanged' items are discovered, a running index is used
22+
-- for the 'eventId', while the 'New StateEvent' values are just stored as is.
23+
--
24+
-- A new implementation for an 'EventSource' with a compatible 'EventSink' could
25+
-- be defined more generically with constraints:
26+
--
27+
-- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
28+
eventPairFromPersistenceIncremental ::
29+
(IsChainState tx, MonadSTM m) =>
30+
PersistenceIncremental (PersistedStateChange tx) m ->
31+
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
32+
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
33+
eventIdV <- newTVarIO Nothing
34+
let
35+
getLastSeenEventId = readTVar eventIdV
36+
37+
setLastSeenEventId StateEvent{eventId} = do
38+
writeTVar eventIdV (Just eventId)
39+
40+
getNextEventId =
41+
maybe 0 (+ 1) <$> readTVar eventIdV
42+
43+
-- Keep track of the last seen event id when loading
44+
getEvents = do
45+
items <- loadAll
46+
atomically . forM items $ \i -> do
47+
event <- case i of
48+
New e -> pure e
49+
Legacy sc -> do
50+
eventId <- getNextEventId
51+
pure $ StateEvent eventId sc
52+
53+
setLastSeenEventId event
54+
pure event
55+
56+
-- Filter events that are already stored
57+
putEvent e@StateEvent{eventId} = do
58+
atomically getLastSeenEventId >>= \case
59+
Nothing -> store e
60+
Just lastSeenEventId
61+
| eventId > lastSeenEventId -> store e
62+
| otherwise -> pure ()
63+
64+
store e = do
65+
append (New e)
66+
atomically $ setLastSeenEventId e
67+
68+
pure (EventSource{getEvents}, EventSink{putEvent})
69+
70+
-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
71+
-- compatible with plain usage of 'PersistenceIncrementa' using plain
72+
-- 'StateChanged' items to the new 'StateEvent' persisted items.
73+
data PersistedStateChange tx
74+
= Legacy (StateChanged tx)
75+
| New (StateEvent tx)
76+
deriving stock (Generic, Show, Eq)
77+
78+
instance IsChainState tx => ToJSON (PersistedStateChange tx) where
79+
toJSON = \case
80+
Legacy sc -> toJSON sc
81+
New e -> toJSON e
82+
83+
instance IsChainState tx => FromJSON (PersistedStateChange tx) where
84+
parseJSON v =
85+
New <$> parseJSON v
86+
<|> Legacy <$> parseJSON v

hydra-node/src/Hydra/Node/Run.hs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import Hydra.Chain.Direct (loadChainContext, mkTinyWallet, withDirectChain)
1313
import Hydra.Chain.Direct.State (initialChainState)
1414
import Hydra.Chain.Offline (loadGenesisFile, withOfflineChain)
1515
import Hydra.Environment (Environment (..))
16+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
1617
import Hydra.Ledger.Cardano qualified as Ledger
1718
import Hydra.Ledger.Cardano.Configuration (
1819
Globals,
@@ -45,7 +46,7 @@ import Hydra.Options (
4546
RunOptions (..),
4647
validateRunOptions,
4748
)
48-
import Hydra.Persistence (createPersistenceIncremental, eventPairFromPersistenceIncremental)
49+
import Hydra.Persistence (createPersistenceIncremental)
4950

5051
data ConfigurationException
5152
= ConfigurationException ProtocolParametersConversionError

hydra-node/src/Hydra/Persistence.hs

-67
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ import Control.Monad.Class.MonadFork (myThreadId)
99
import Data.Aeson qualified as Aeson
1010
import Data.ByteString qualified as BS
1111
import Data.ByteString.Char8 qualified as C8
12-
import Hydra.Chain (IsChainState)
13-
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
14-
import Hydra.HeadLogic (StateChanged)
1512
import System.Directory (createDirectoryIfMissing, doesFileExist)
1613
import System.FilePath (takeDirectory)
1714
import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic)
@@ -97,67 +94,3 @@ createPersistenceIncremental fp = do
9794
Left e -> throwIO $ PersistenceException e
9895
Right decoded -> pure decoded
9996
}
100-
101-
-- * Event Source / Sink interface
102-
103-
-- | Data type used by 'createJSONFileEventSourceAndSink' to be used when
104-
-- upgrading from the legacy 'StateChanged' to the new 'StateEvent' persisted
105-
-- items.
106-
data PersistedStateChange tx
107-
= Legacy (StateChanged tx)
108-
| New (StateEvent tx)
109-
deriving stock (Generic, Show, Eq)
110-
111-
instance IsChainState tx => ToJSON (PersistedStateChange tx) where
112-
toJSON = \case
113-
Legacy sc -> toJSON sc
114-
New e -> toJSON e
115-
116-
instance IsChainState tx => FromJSON (PersistedStateChange tx) where
117-
parseJSON v =
118-
New <$> parseJSON v
119-
<|> Legacy <$> parseJSON v
120-
121-
-- | A basic file based event source and sink defined using an
122-
-- 'PersistenceIncremental' handle.
123-
eventPairFromPersistenceIncremental ::
124-
(IsChainState tx, MonadSTM m) =>
125-
PersistenceIncremental (PersistedStateChange tx) m ->
126-
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
127-
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
128-
eventIdV <- newTVarIO Nothing
129-
let
130-
getLastSeenEventId = readTVar eventIdV
131-
132-
setLastSeenEventId StateEvent{eventId} = do
133-
writeTVar eventIdV (Just eventId)
134-
135-
getNextEventId =
136-
maybe 0 (+ 1) <$> readTVar eventIdV
137-
138-
-- Keep track of the last seen event id when loading
139-
getEvents = do
140-
items <- loadAll
141-
atomically . forM items $ \i -> do
142-
event <- case i of
143-
New e -> pure e
144-
Legacy sc -> do
145-
eventId <- getNextEventId
146-
pure $ StateEvent eventId sc
147-
148-
setLastSeenEventId event
149-
pure event
150-
151-
-- Filter events that are already stored
152-
putEvent e@StateEvent{eventId} = do
153-
atomically getLastSeenEventId >>= \case
154-
Nothing -> store e
155-
Just lastSeenEventId
156-
| eventId > lastSeenEventId -> store e
157-
| otherwise -> pure ()
158-
159-
store e = do
160-
append (New e)
161-
atomically $ setLastSeenEventId e
162-
163-
pure (EventSource{getEvents}, EventSink{putEvent})

hydra-node/test/Hydra/BehaviorSpec.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import Hydra.Chain.Direct.State (ChainStateAt (..))
3939
import Hydra.ContestationPeriod (ContestationPeriod (UnsafeContestationPeriod), toNominalDiffTime)
4040
import Hydra.Crypto (HydraKey, aggregate, sign)
4141
import Hydra.Environment (Environment (..))
42+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
4243
import Hydra.HeadLogic (
4344
Effect (..),
4445
HeadState (..),
@@ -64,7 +65,6 @@ import Hydra.Node (
6465
import Hydra.Node.InputQueue (InputQueue (enqueue), createInputQueue)
6566
import Hydra.NodeSpec (createPersistenceInMemory)
6667
import Hydra.Party (Party (..), deriveParty)
67-
import Hydra.Persistence (eventPairFromPersistenceIncremental)
6868
import Hydra.Snapshot (Snapshot (..), SnapshotNumber, getSnapshot)
6969
import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, deriveOnChainId, testHeadId, testHeadSeed)
7070
import Test.Util (shouldBe, shouldNotBe, shouldRunInSim, traceInIOSim)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
-- | Tests for the 'EventSource' and 'EventSink' implementation in 'Hydra.Events.FileBased'.
2+
module Hydra.Events.FileBasedSpec where
3+
4+
import Hydra.Prelude hiding (label)
5+
import Test.Hydra.Prelude
6+
7+
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..), getEvents, putEvent)
8+
import Hydra.Ledger.Simple (SimpleTx)
9+
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncremental)
10+
import Test.QuickCheck (forAllShrink, ioProperty, sublistOf, (===))
11+
import Test.QuickCheck.Gen (listOf)
12+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
13+
14+
spec :: Spec
15+
spec = do
16+
describe "eventPairFromPersistenceIncremental" $ do
17+
prop "can handle continuous events" $
18+
forAllShrink genContinuousEvents shrink $ \events ->
19+
ioProperty $ do
20+
withEventSourceAndSink $ \EventSource{getEvents} EventSink{putEvent} -> do
21+
forM_ events putEvent
22+
loadedEvents <- getEvents
23+
pure $
24+
loadedEvents === events
25+
26+
prop "can handle non-continuous events" $
27+
forAllShrink (sublistOf =<< genContinuousEvents) shrink $ \events ->
28+
ioProperty $ do
29+
withEventSourceAndSink $ \EventSource{getEvents} EventSink{putEvent} -> do
30+
forM_ events putEvent
31+
loadedEvents <- getEvents
32+
pure $
33+
loadedEvents === events
34+
35+
prop "can handle duplicate events" $
36+
forAllShrink genContinuousEvents shrink $ \events ->
37+
ioProperty $
38+
withEventSourceAndSink $ \EventSource{getEvents} EventSink{putEvent} -> do
39+
-- Put some events
40+
forM_ events putEvent
41+
loadedEvents <- getEvents
42+
-- Put the loaded events again (as the node would do)
43+
forM_ loadedEvents putEvent
44+
allEvents <- getEvents
45+
pure $
46+
allEvents === loadedEvents
47+
48+
prop "can bootstrap from plain StateChanged events" $
49+
forAllShrink genContinuousEvents shrink $ \events -> do
50+
ioProperty $ do
51+
withTempDir "hydra-persistence" $ \tmpDir -> do
52+
-- Store state changes directly (legacy)
53+
let stateChanges = map stateChanged events
54+
PersistenceIncremental{append} <- createPersistenceIncremental (tmpDir <> "/data")
55+
forM_ stateChanges append
56+
-- Load and store events through the event source interface
57+
(EventSource{getEvents}, EventSink{putEvent}) <-
58+
eventPairFromPersistenceIncremental
59+
=<< createPersistenceIncremental (tmpDir <> "/data")
60+
loadedEvents <- getEvents
61+
-- Store all loaded events like the node would do
62+
forM_ loadedEvents putEvent
63+
pure $
64+
map stateChanged loadedEvents === stateChanges
65+
66+
genContinuousEvents :: Gen [StateEvent SimpleTx]
67+
genContinuousEvents =
68+
zipWith StateEvent [0 ..] <$> listOf arbitrary
69+
70+
withEventSourceAndSink :: (EventSource (StateEvent SimpleTx) IO -> EventSink (StateEvent SimpleTx) IO -> IO b) -> IO b
71+
withEventSourceAndSink action =
72+
withTempDir "hydra-persistence" $ \tmpDir -> do
73+
(eventSource, eventSink) <-
74+
eventPairFromPersistenceIncremental
75+
=<< createPersistenceIncremental (tmpDir <> "/data")
76+
action eventSource eventSink

hydra-node/test/Hydra/NodeSpec.hs

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Hydra.Crypto (HydraKey, sign)
1616
import Hydra.Environment (Environment (..))
1717
import Hydra.Environment qualified as Environment
1818
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..), genStateEvent, getEventId)
19+
import Hydra.Events.FileBased (eventPairFromPersistenceIncremental)
1920
import Hydra.HeadLogic (Input (..), defaultTTL)
2021
import Hydra.HeadLogic.Outcome (StateChanged (HeadInitialized), genStateChanged)
2122
import Hydra.HeadLogicSpec (inInitialState, testSnapshot)
@@ -38,7 +39,7 @@ import Hydra.Node.InputQueue (InputQueue (..))
3839
import Hydra.Node.ParameterMismatch (ParameterMismatch (..))
3940
import Hydra.Options (defaultContestationPeriod)
4041
import Hydra.Party (Party, deriveParty)
41-
import Hydra.Persistence (PersistenceIncremental (..), eventPairFromPersistenceIncremental)
42+
import Hydra.Persistence (PersistenceIncremental (..))
4243
import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, carol, carolSk, cperiod, deriveOnChainId, testEnvironment, testHeadId, testHeadSeed)
4344
import Test.QuickCheck (classify, counterexample, elements, forAllBlind, forAllShrink, forAllShrinkBlind, idempotentIOProperty, listOf, listOf1, resize, (==>))
4445
import Test.Util (isStrictlyMonotonic)

0 commit comments

Comments
 (0)