Skip to content

Commit 89f8fb5

Browse files
authoredMar 18, 2024··
Merge pull request #1351 from SundaeSwap-finance/SB-1352-persistence-types
EventSource and EventSink abstractions for Hydra Extensibility
2 parents 65e15b2 + 724f6e8 commit 89f8fb5

25 files changed

+914
-376
lines changed
 

‎CHANGELOG.md

+9-2
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,17 @@ changes.
3939
- Add support for `Conway` in `hydra-chain-observer`.
4040

4141
- **BREAKING** Change to the `hydra-node` logs, monitoring and removal of `log-filter` executable:
42-
- We renamed the `Event` data types to `Input` and consequently log items like `BeginEvent` to `BeginInput`.
43-
- In course of this, we also removed the `log-filter` executable as nobody is actively using it and we recommend using other off-the-shelf utilities to manipulate structured JSON logs (`jq` is already quite powerful).
42+
- Renamed the `Event` data types to `Input` and consequently log items like `BeginEvent` to `BeginInput`.
43+
- Changed structure of `LogicOutcome` entries.
44+
- Added node-level log entry when an input was `DroppedFromQueue`.
45+
- In course of this, the `log-filter` executable was removed as nobody is actively using it and other off-the-shelf utilities to manipulate structured JSON logs (`jq` is already quite powerful) are recommended.
4446
- Renamed prometheus metric `hydra_head_events -> hydra_head_inputs`.
4547

48+
- Introduce `EventSource` and `EventSink` interfaces in `hydra-node`:
49+
- These handles can now be used as "extension points" to make the `hydra-node` store and load its state differently or expose `StateEvent`s to other, external services.
50+
- Internal refactoring of persistence mechanism as event source and sink in a backward-compatible way.
51+
- More details can be found in [ADR21](https://hydra.family/head-protocol/adr/21/)
52+
4653
## [0.15.0] - 2024-01-18
4754

4855
- Tested with `cardano-node 8.7.3` and `cardano-cli 8.17.0.0`.

‎docs/adr/2023-11-07_029-event-source-sink.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ slug: 29
33
title: |
44
29. EventSource & EventSink abstractions
55
authors: [cardenaso11, quantumplation, ch1bo]
6-
tags: [Draft]
6+
tags: [Accepted]
77
---
88

99
## Status
10-
Draft
10+
Accepted
1111

1212
## Context
1313

@@ -42,12 +42,12 @@ Draft
4242
```hs
4343
data HydraNode tx m = HydraNode
4444
{ -- ...
45-
, eventSource :: EventSource (StateChanged tx) m
46-
, eventSinks :: [EventSink (StateChanged tx) m]
45+
, eventSource :: EventSource (StateEvent tx) m
46+
, eventSinks :: [EventSink (StateEvent tx) m]
4747
}
4848
```
4949

50-
* The `hydra-node` will load events and __hydra_te its `HeadState` using `getEvents` of the single `eventSource`.
50+
* The `hydra-node` will load events and `hydrate` its `HeadState` using `getEvents` of the single `eventSource`.
5151

5252
* The `stepHydraNode` main loop does call `putEvent` on all `eventSinks` in sequence. Any failure will make the `hydra-node` process terminate and require a restart.
5353

‎hydra-cluster/src/Hydra/Cluster/Scenarios.hs

+1-1
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ threeNodesNoErrorsOnOpen tracer tmpDir node@RunningNode{nodeSocket} hydraScripts
559559

560560
let contestationPeriod = UnsafeContestationPeriod 2
561561
let hydraTracer = contramap FromHydraNode tracer
562-
withHydraCluster hydraTracer tmpDir nodeSocket 0 cardanoKeys hydraKeys hydraScriptsTxId contestationPeriod $ \(leader :| rest) -> do
562+
withHydraCluster hydraTracer tmpDir nodeSocket 1 cardanoKeys hydraKeys hydraScriptsTxId contestationPeriod $ \(leader :| rest) -> do
563563
let clients = leader : rest
564564
waitForNodesConnected hydraTracer 20 clients
565565

‎hydra-node/hydra-node.cabal

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ library
6767
Hydra.Chain.Offline
6868
Hydra.ContestationPeriod
6969
Hydra.Crypto
70+
Hydra.Environment
71+
Hydra.Events
72+
Hydra.Events.FileBased
7073
Hydra.HeadId
7174
Hydra.HeadLogic
7275
Hydra.HeadLogic.Error
@@ -290,6 +293,7 @@ test-suite tests
290293
Hydra.Chain.Direct.WalletSpec
291294
Hydra.ContestationPeriodSpec
292295
Hydra.CryptoSpec
296+
Hydra.Events.FileBasedSpec
293297
Hydra.FireForgetSpec
294298
Hydra.HeadLogicSnapshotSpec
295299
Hydra.HeadLogicSpec

‎hydra-node/json-schemas/logs.yaml

+23-4
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,25 @@ definitions:
876876
The Party emitting the log entry.
877877
outcome:
878878
$ref: "logs.yaml#/definitions/Outcome"
879+
- title: DroppedFromQueue
880+
description: >-
881+
An input event has been dropped from the input queue as its
882+
time-to-live ran out.
883+
type: object
884+
additionalProperties: false
885+
required:
886+
- tag
887+
- inputId
888+
- input
889+
properties:
890+
tag:
891+
type: string
892+
enum: ["DroppedFromQueue"]
893+
inputId:
894+
type: integer
895+
minimum: 0
896+
input:
897+
"$ref": "logs.yaml#/definitions/Input"
879898
- title: LoadedState
880899
description: >-
881900
Loaded state events from persistence.
@@ -1909,15 +1928,15 @@ definitions:
19091928
additionalProperties: false
19101929
required:
19111930
- tag
1912-
- events
1931+
- stateChanges
19131932
- effects
19141933
description: >-
19151934
Continue with the given state update events and side effects.
19161935
properties:
19171936
tag:
19181937
type: string
19191938
enum: ["Continue"]
1920-
events:
1939+
stateChanges:
19211940
type: array
19221941
items:
19231942
type: object
@@ -1933,7 +1952,7 @@ definitions:
19331952
required:
19341953
- tag
19351954
- reason
1936-
- events
1955+
- stateChanges
19371956
description: >-
19381957
Wait for some condition to be met with optional state updates.
19391958
properties:
@@ -1943,7 +1962,7 @@ definitions:
19431962
reason:
19441963
type: object
19451964
$ref: "logs.yaml#/definitions/WaitReason"
1946-
events:
1965+
stateChanges:
19471966
type: array
19481967
items:
19491968
type: object

‎hydra-node/src/Hydra/Chain.hs

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import Hydra.Cardano.Api (
2727
Witness,
2828
)
2929
import Hydra.ContestationPeriod (ContestationPeriod)
30+
import Hydra.Environment (Environment (..))
3031
import Hydra.HeadId (HeadId, HeadSeed)
3132
import Hydra.Ledger (ChainSlot, IsTx, UTxOType)
3233
import Hydra.OnChainId (OnChainId)
@@ -60,6 +61,11 @@ instance Arbitrary HeadParameters where
6061
dedupParties HeadParameters{contestationPeriod, parties} =
6162
HeadParameters{contestationPeriod, parties = nub parties}
6263

64+
-- | Make 'HeadParameters' that are consistent with the given 'Environment'.
65+
mkHeadParameters :: Environment -> HeadParameters
66+
mkHeadParameters Environment{party, otherParties, contestationPeriod} =
67+
HeadParameters{contestationPeriod, parties = party : otherParties}
68+
6369
-- | Data type used to post transactions on chain. It holds everything to
6470
-- construct corresponding Head protocol transactions.
6571
data PostChainTx tx

‎hydra-node/src/Hydra/Environment.hs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
module Hydra.Environment where
2+
3+
import Hydra.Prelude
4+
5+
import Hydra.ContestationPeriod (ContestationPeriod)
6+
import Hydra.Crypto (HydraKey, SigningKey)
7+
import Hydra.OnChainId (OnChainId)
8+
import Hydra.Party (Party, deriveParty)
9+
10+
data Environment = Environment
11+
{ party :: Party
12+
-- ^ This is the p_i from the paper
13+
, -- XXX: In the long run we would not want to keep the signing key in memory,
14+
-- i.e. have an 'Effect' for signing or so.
15+
signingKey :: SigningKey HydraKey
16+
, otherParties :: [Party]
17+
, -- XXX: Improve naming
18+
participants :: [OnChainId]
19+
, contestationPeriod :: ContestationPeriod
20+
}
21+
deriving stock (Show, Eq)
22+
23+
instance Arbitrary Environment where
24+
arbitrary = do
25+
signingKey <- arbitrary
26+
otherParties <- arbitrary
27+
participants <- arbitrary
28+
contestationPeriod <- arbitrary
29+
pure $
30+
Environment
31+
{ signingKey
32+
, party = deriveParty signingKey
33+
, otherParties
34+
, contestationPeriod
35+
, participants
36+
}

‎hydra-node/src/Hydra/Events.hs

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
-- | This module defines the types and functions for creating 'EventSource' and
2+
-- 'EventSink' instances and is intended to be used as an extension point.
3+
--
4+
-- A single 'EventSource' and zero or more 'EventSink' handles are used by the
5+
-- main 'HydraNode' handle to load and send out events.
6+
--
7+
-- See 'Hydra.Events.FileBased' for an example implementation and
8+
-- 'Hydra.Events.FileBasedSpec' for the corresponding test suite.
9+
--
10+
-- Custom implementations should be located under Hydra.Events to avoid
11+
-- conflicts.
12+
module Hydra.Events where
13+
14+
import Hydra.Prelude
15+
16+
import Hydra.Chain (IsChainState)
17+
import Hydra.HeadLogic.Outcome (StateChanged)
18+
19+
type EventId = Word64
20+
21+
class HasEventId a where
22+
getEventId :: a -> EventId
23+
24+
instance HasEventId (EventId, a) where
25+
getEventId = fst
26+
27+
newtype EventSource e m = EventSource
28+
{ getEvents :: HasEventId e => m [e]
29+
-- ^ Retrieve all events from the event source.
30+
}
31+
32+
newtype EventSink e m = EventSink
33+
{ putEvent :: HasEventId e => e -> m ()
34+
-- ^ Send a single event to the event sink.
35+
}
36+
37+
-- | Put a list of events to a list of event sinks in a round-robin fashion.
38+
putEventsToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> [e] -> m ()
39+
putEventsToSinks sinks events =
40+
forM_ events $ \event ->
41+
forM_ sinks $ \sink ->
42+
putEvent sink event
43+
44+
-- * State change events as used by Hydra.Node
45+
46+
-- | A state change event with an event id that is the common entity to be
47+
-- loaded from an 'EventSource' and sent to 'EventSink's.
48+
data StateEvent tx = StateEvent
49+
{ eventId :: EventId
50+
, stateChanged :: StateChanged tx
51+
}
52+
deriving (Generic)
53+
54+
instance HasEventId (StateEvent tx) where
55+
getEventId = eventId
56+
57+
deriving instance IsChainState tx => Show (StateEvent tx)
58+
deriving instance IsChainState tx => Eq (StateEvent tx)
59+
deriving instance IsChainState tx => ToJSON (StateEvent tx)
60+
deriving instance IsChainState tx => FromJSON (StateEvent tx)
61+
62+
instance IsChainState tx => Arbitrary (StateEvent tx) where
63+
arbitrary = genericArbitrary
64+
shrink = genericShrink
65+
66+
genStateEvent :: StateChanged tx -> Gen (StateEvent tx)
67+
genStateEvent sc = StateEvent <$> arbitrary <*> pure sc
+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
-- | A file-based event source and sink using JSON encoding.
2+
--
3+
-- This serves as an example of how to create an 'EventSource' and 'EventSink'.
4+
module Hydra.Events.FileBased where
5+
6+
import Hydra.Prelude
7+
8+
import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
9+
import Hydra.Chain (IsChainState)
10+
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
11+
import Hydra.HeadLogic.Outcome (StateChanged)
12+
import Hydra.Persistence (PersistenceIncremental (..))
13+
14+
-- | A basic file based event source and sink defined using an
15+
-- 'PersistenceIncremental' handle.
16+
--
17+
-- The complexity in this implementation mostly stems from the fact that we want
18+
-- to be backward-compatible with the old, plain format of storing
19+
-- 'StateChanged' items directly to disk using 'PersistenceIncremental'.
20+
--
21+
-- If any 'Legacy StateChanged' items are discovered, a running index is used
22+
-- for the 'eventId', while the 'New StateEvent' values are just stored as is.
23+
--
24+
-- A new implementation for an 'EventSource' with a compatible 'EventSink' could
25+
-- be defined more generically with constraints:
26+
--
27+
-- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
28+
eventPairFromPersistenceIncremental ::
29+
(IsChainState tx, MonadSTM m) =>
30+
PersistenceIncremental (PersistedStateChange tx) m ->
31+
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
32+
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
33+
eventIdV <- newTVarIO Nothing
34+
let
35+
getLastSeenEventId = readTVar eventIdV
36+
37+
setLastSeenEventId StateEvent{eventId} = do
38+
writeTVar eventIdV (Just eventId)
39+
40+
getNextEventId =
41+
maybe 0 (+ 1) <$> readTVar eventIdV
42+
43+
-- Keep track of the last seen event id when loading
44+
getEvents = do
45+
items <- loadAll
46+
atomically . forM items $ \i -> do
47+
event <- case i of
48+
New e -> pure e
49+
Legacy sc -> do
50+
eventId <- getNextEventId
51+
pure $ StateEvent eventId sc
52+
53+
setLastSeenEventId event
54+
pure event
55+
56+
-- Filter events that are already stored
57+
putEvent e@StateEvent{eventId} = do
58+
atomically getLastSeenEventId >>= \case
59+
Nothing -> store e
60+
Just lastSeenEventId
61+
| eventId > lastSeenEventId -> store e
62+
| otherwise -> pure ()
63+
64+
store e = do
65+
append (New e)
66+
atomically $ setLastSeenEventId e
67+
68+
pure (EventSource{getEvents}, EventSink{putEvent})
69+
70+
-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
71+
-- compatible with plain usage of 'PersistenceIncrementa' using plain
72+
-- 'StateChanged' items to the new 'StateEvent' persisted items.
73+
data PersistedStateChange tx
74+
= Legacy (StateChanged tx)
75+
| New (StateEvent tx)
76+
deriving stock (Generic, Show, Eq)
77+
78+
instance IsChainState tx => ToJSON (PersistedStateChange tx) where
79+
toJSON = \case
80+
Legacy sc -> toJSON sc
81+
New e -> toJSON e
82+
83+
instance IsChainState tx => FromJSON (PersistedStateChange tx) where
84+
parseJSON v =
85+
New <$> parseJSON v
86+
<|> Legacy <$> parseJSON v

0 commit comments

Comments
 (0)
Please sign in to comment.