Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventSource and EventSink abstractions for Hydra Extensibility #1351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
610c1c3
initial work (WIP)
cardenaso11 Jan 16, 2024
b04c996
WIP checkpoint
cardenaso11 Feb 13, 2024
0a5fb3a
checkpoint apiserver
cardenaso11 Feb 13, 2024
9d79b81
WIP: stateChangeID into StateChanged, routed into disk-persistence ev…
cardenaso11 Feb 15, 2024
b67751c
remove inaccurate fixme
cardenaso11 Feb 15, 2024
5e4c883
checkpoint: lib build
cardenaso11 Feb 19, 2024
cc32731
WIP: going back and forth on how to manage state in tests
cardenaso11 Feb 19, 2024
2738490
eventid
cardenaso11 Feb 19, 2024
d8f2579
remove newpersistence data type (in exchange for type synonym) stuff …
cardenaso11 Feb 19, 2024
7e66bc8
checkpoint for testing
cardenaso11 Feb 19, 2024
ea08a5c
some tests
cardenaso11 Feb 20, 2024
b52e547
checkpoint: fix all hydra-node tests besides golden
cardenaso11 Feb 21, 2024
92aa406
Use eventPairFromPersistenceIncremental
cardenaso11 Mar 4, 2024
2c664d9
Use the original PersistenceIncremental in withAPIServer
ch1bo Mar 4, 2024
f704f1f
Only use eventPairFromPersistenceIncremental
ch1bo Mar 4, 2024
dbedcde
remove NewPersistenceIncremental type, use eventSource + eventSinks d…
cardenaso11 Mar 4, 2024
2f52e26
Move EventSource and EventSink definitions into Hydra.Events
ch1bo Mar 4, 2024
6f31865
Add HasEventId class to ensure events can be identified
ch1bo Mar 4, 2024
fae02f9
Draft extension points in Hydra.Node.Run
ch1bo Mar 4, 2024
8ba7990
Rename getEvents' -> getEvents and putEvent' -> putEvent
ch1bo Mar 4, 2024
a6f29cd
Draft a new way to wire HydraNode in Node.Run
ch1bo Mar 11, 2024
2e7df0b
Add node-level tests for event source / sink handling
ch1bo Mar 11, 2024
738ebcc
Unify Hydra.Node function signatures and refine NodeSpec
ch1bo Mar 11, 2024
ca28b3e
Move Environment into dedicated module
ch1bo Mar 12, 2024
20072e2
Generate compatible state change events when testing hydrate
ch1bo Mar 12, 2024
3944807
Add more tests on event ids being strictly monotonic
ch1bo Mar 12, 2024
42b3048
Not re-enqueue forever in stepHydraNode
ch1bo Mar 12, 2024
cbd09fe
Create a StateEvent type and keep track of nextEventId in NodeState
ch1bo Mar 12, 2024
3b587e4
Add tests to assert de-duplication on PersistenceIncremental event so…
ch1bo Mar 13, 2024
995376c
Move eventPairFromPersistenceIncremental to Hydra.Events.FileBased
ch1bo Mar 13, 2024
724f6e8
Update changelog and mark ADR as accepted
ch1bo Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ changes.
- Add support for `Conway` in `hydra-chain-observer`.

- **BREAKING** Change to the `hydra-node` logs, monitoring and removal of `log-filter` executable:
- We renamed the `Event` data types to `Input` and consequently log items like `BeginEvent` to `BeginInput`.
- In course of this, we also removed the `log-filter` executable as nobody is actively using it and we recommend using other off-the-shelf utilities to manipulate structured JSON logs (`jq` is already quite powerful).
- Renamed the `Event` data types to `Input` and consequently log items like `BeginEvent` to `BeginInput`.
- Changed structure of `LogicOutcome` entries.
- Added node-level log entry when an input was `DroppedFromQueue`.
- In course of this, the `log-filter` executable was removed as nobody is actively using it and other off-the-shelf utilities to manipulate structured JSON logs (`jq` is already quite powerful) are recommended.
- Renamed prometheus metric `hydra_head_events -> hydra_head_inputs`.

- Introduce `EventSource` and `EventSink` interfaces in `hydra-node`:
- These handles can now be used as "extension points" to make the `hydra-node` store and load its state differently or expose `StateEvent`s to other, external services.
- Internal refactoring of persistence mechanism as event source and sink in a backward-compatible way.
- More details can be found in [ADR21](https://hydra.family/head-protocol/adr/21/)

## [0.15.0] - 2024-01-18

- Tested with `cardano-node 8.7.3` and `cardano-cli 8.17.0.0`.
Expand Down
10 changes: 5 additions & 5 deletions docs/adr/2023-11-07_029-event-source-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ slug: 29
title: |
29. EventSource & EventSink abstractions
authors: [cardenaso11, quantumplation, ch1bo]
tags: [Draft]
tags: [Accepted]
---

## Status
Draft
Accepted

## Context

Expand Down Expand Up @@ -42,12 +42,12 @@ Draft
```hs
data HydraNode tx m = HydraNode
{ -- ...
, eventSource :: EventSource (StateChanged tx) m
, eventSinks :: [EventSink (StateChanged tx) m]
, eventSource :: EventSource (StateEvent tx) m
, eventSinks :: [EventSink (StateEvent tx) m]
}
```

* The `hydra-node` will load events and __hydra_te its `HeadState` using `getEvents` of the single `eventSource`.
* The `hydra-node` will load events and `hydrate` its `HeadState` using `getEvents` of the single `eventSource`.

* The `stepHydraNode` main loop does call `putEvent` on all `eventSinks` in sequence. Any failure will make the `hydra-node` process terminate and require a restart.

Expand Down
2 changes: 1 addition & 1 deletion hydra-cluster/src/Hydra/Cluster/Scenarios.hs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ threeNodesNoErrorsOnOpen tracer tmpDir node@RunningNode{nodeSocket} hydraScripts

let contestationPeriod = UnsafeContestationPeriod 2
let hydraTracer = contramap FromHydraNode tracer
withHydraCluster hydraTracer tmpDir nodeSocket 0 cardanoKeys hydraKeys hydraScriptsTxId contestationPeriod $ \(leader :| rest) -> do
withHydraCluster hydraTracer tmpDir nodeSocket 1 cardanoKeys hydraKeys hydraScriptsTxId contestationPeriod $ \(leader :| rest) -> do
let clients = leader : rest
waitForNodesConnected hydraTracer 20 clients

Expand Down
4 changes: 4 additions & 0 deletions hydra-node/hydra-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ library
Hydra.Chain.Offline
Hydra.ContestationPeriod
Hydra.Crypto
Hydra.Environment
Hydra.Events
Hydra.Events.FileBased
Hydra.HeadId
Hydra.HeadLogic
Hydra.HeadLogic.Error
Expand Down Expand Up @@ -290,6 +293,7 @@ test-suite tests
Hydra.Chain.Direct.WalletSpec
Hydra.ContestationPeriodSpec
Hydra.CryptoSpec
Hydra.Events.FileBasedSpec
Hydra.FireForgetSpec
Hydra.HeadLogicSnapshotSpec
Hydra.HeadLogicSpec
Expand Down
27 changes: 23 additions & 4 deletions hydra-node/json-schemas/logs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,25 @@ definitions:
The Party emitting the log entry.
outcome:
$ref: "logs.yaml#/definitions/Outcome"
- title: DroppedFromQueue
description: >-
An input event has been dropped from the input queue as its
time-to-live ran out.
type: object
additionalProperties: false
required:
- tag
- inputId
- input
properties:
tag:
type: string
enum: ["DroppedFromQueue"]
inputId:
type: integer
minimum: 0
input:
"$ref": "logs.yaml#/definitions/Input"
- title: LoadedState
description: >-
Loaded state events from persistence.
Expand Down Expand Up @@ -1909,15 +1928,15 @@ definitions:
additionalProperties: false
required:
- tag
- events
- stateChanges
- effects
description: >-
Continue with the given state update events and side effects.
properties:
tag:
type: string
enum: ["Continue"]
events:
stateChanges:
type: array
items:
type: object
Expand All @@ -1933,7 +1952,7 @@ definitions:
required:
- tag
- reason
- events
- stateChanges
description: >-
Wait for some condition to be met with optional state updates.
properties:
Expand All @@ -1943,7 +1962,7 @@ definitions:
reason:
type: object
$ref: "logs.yaml#/definitions/WaitReason"
events:
stateChanges:
type: array
items:
type: object
Expand Down
6 changes: 6 additions & 0 deletions hydra-node/src/Hydra/Chain.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Hydra.Cardano.Api (
Witness,
)
import Hydra.ContestationPeriod (ContestationPeriod)
import Hydra.Environment (Environment (..))
import Hydra.HeadId (HeadId, HeadSeed)
import Hydra.Ledger (ChainSlot, IsTx, UTxOType)
import Hydra.OnChainId (OnChainId)
Expand Down Expand Up @@ -60,6 +61,11 @@ instance Arbitrary HeadParameters where
dedupParties HeadParameters{contestationPeriod, parties} =
HeadParameters{contestationPeriod, parties = nub parties}

-- | Make 'HeadParameters' that are consistent with the given 'Environment'.
mkHeadParameters :: Environment -> HeadParameters
mkHeadParameters Environment{party, otherParties, contestationPeriod} =
HeadParameters{contestationPeriod, parties = party : otherParties}

-- | Data type used to post transactions on chain. It holds everything to
-- construct corresponding Head protocol transactions.
data PostChainTx tx
Expand Down
36 changes: 36 additions & 0 deletions hydra-node/src/Hydra/Environment.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Hydra.Environment where

import Hydra.Prelude

import Hydra.ContestationPeriod (ContestationPeriod)
import Hydra.Crypto (HydraKey, SigningKey)
import Hydra.OnChainId (OnChainId)
import Hydra.Party (Party, deriveParty)

data Environment = Environment
{ party :: Party
-- ^ This is the p_i from the paper
, -- XXX: In the long run we would not want to keep the signing key in memory,
-- i.e. have an 'Effect' for signing or so.
signingKey :: SigningKey HydraKey
, otherParties :: [Party]
, -- XXX: Improve naming
participants :: [OnChainId]
, contestationPeriod :: ContestationPeriod
}
deriving stock (Show, Eq)

instance Arbitrary Environment where
arbitrary = do
signingKey <- arbitrary
otherParties <- arbitrary
participants <- arbitrary
contestationPeriod <- arbitrary
pure $
Environment
{ signingKey
, party = deriveParty signingKey
, otherParties
, contestationPeriod
, participants
}
67 changes: 67 additions & 0 deletions hydra-node/src/Hydra/Events.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
-- | This module defines the types and functions for creating 'EventSource' and
-- 'EventSink' instances and is intended to be used as an extension point.
--
-- A single 'EventSource' and zero or more 'EventSink' handles are used by the
-- main 'HydraNode' handle to load and send out events.
--
-- See 'Hydra.Events.FileBased' for an example implementation and
-- 'Hydra.Events.FileBasedSpec' for the corresponding test suite.
--
-- Custom implementations should be located under Hydra.Events to avoid
-- conflicts.
module Hydra.Events where

import Hydra.Prelude

import Hydra.Chain (IsChainState)
import Hydra.HeadLogic.Outcome (StateChanged)

type EventId = Word64

class HasEventId a where
getEventId :: a -> EventId

instance HasEventId (EventId, a) where
getEventId = fst

newtype EventSource e m = EventSource
{ getEvents :: HasEventId e => m [e]
-- ^ Retrieve all events from the event source.
}

newtype EventSink e m = EventSink
{ putEvent :: HasEventId e => e -> m ()
-- ^ Send a single event to the event sink.
}

-- | Put a list of events to a list of event sinks in a round-robin fashion.
putEventsToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> [e] -> m ()
putEventsToSinks sinks events =
forM_ events $ \event ->
forM_ sinks $ \sink ->
putEvent sink event

-- * State change events as used by Hydra.Node

-- | A state change event with an event id that is the common entity to be
-- loaded from an 'EventSource' and sent to 'EventSink's.
data StateEvent tx = StateEvent
{ eventId :: EventId
, stateChanged :: StateChanged tx
}
deriving (Generic)

instance HasEventId (StateEvent tx) where
getEventId = eventId

deriving instance IsChainState tx => Show (StateEvent tx)
deriving instance IsChainState tx => Eq (StateEvent tx)
deriving instance IsChainState tx => ToJSON (StateEvent tx)
deriving instance IsChainState tx => FromJSON (StateEvent tx)

instance IsChainState tx => Arbitrary (StateEvent tx) where
arbitrary = genericArbitrary
shrink = genericShrink

genStateEvent :: StateChanged tx -> Gen (StateEvent tx)
genStateEvent sc = StateEvent <$> arbitrary <*> pure sc
86 changes: 86 additions & 0 deletions hydra-node/src/Hydra/Events/FileBased.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-- | A file-based event source and sink using JSON encoding.
--
-- This serves as an example of how to create an 'EventSource' and 'EventSink'.
module Hydra.Events.FileBased where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (newTVarIO, writeTVar)
import Hydra.Chain (IsChainState)
import Hydra.Events (EventSink (..), EventSource (..), StateEvent (..))
import Hydra.HeadLogic.Outcome (StateChanged)
import Hydra.Persistence (PersistenceIncremental (..))

-- | A basic file based event source and sink defined using an
-- 'PersistenceIncremental' handle.
--
-- The complexity in this implementation mostly stems from the fact that we want
-- to be backward-compatible with the old, plain format of storing
-- 'StateChanged' items directly to disk using 'PersistenceIncremental'.
--
-- If any 'Legacy StateChanged' items are discovered, a running index is used
-- for the 'eventId', while the 'New StateEvent' values are just stored as is.
--
-- A new implementation for an 'EventSource' with a compatible 'EventSink' could
-- be defined more generically with constraints:
--
-- (ToJSON e, FromJSON e, HasEventId) e => (EventSource e m, EventSink e m)
eventPairFromPersistenceIncremental ::
(IsChainState tx, MonadSTM m) =>
PersistenceIncremental (PersistedStateChange tx) m ->
m (EventSource (StateEvent tx) m, EventSink (StateEvent tx) m)
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = do
eventIdV <- newTVarIO Nothing
let
getLastSeenEventId = readTVar eventIdV

setLastSeenEventId StateEvent{eventId} = do
writeTVar eventIdV (Just eventId)

getNextEventId =
maybe 0 (+ 1) <$> readTVar eventIdV

-- Keep track of the last seen event id when loading
getEvents = do
items <- loadAll
atomically . forM items $ \i -> do
event <- case i of
New e -> pure e
Legacy sc -> do
eventId <- getNextEventId
pure $ StateEvent eventId sc

setLastSeenEventId event
pure event

-- Filter events that are already stored
putEvent e@StateEvent{eventId} = do
atomically getLastSeenEventId >>= \case
Nothing -> store e
Just lastSeenEventId
| eventId > lastSeenEventId -> store e
| otherwise -> pure ()

store e = do
append (New e)
atomically $ setLastSeenEventId e

pure (EventSource{getEvents}, EventSink{putEvent})

-- | Internal data type used by 'createJSONFileEventSourceAndSink' to be
-- compatible with plain usage of 'PersistenceIncrementa' using plain
-- 'StateChanged' items to the new 'StateEvent' persisted items.
data PersistedStateChange tx
= Legacy (StateChanged tx)
| New (StateEvent tx)
deriving stock (Generic, Show, Eq)

instance IsChainState tx => ToJSON (PersistedStateChange tx) where
toJSON = \case
Legacy sc -> toJSON sc
New e -> toJSON e

instance IsChainState tx => FromJSON (PersistedStateChange tx) where
parseJSON v =
New <$> parseJSON v
<|> Legacy <$> parseJSON v
Loading
Loading