Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create EventSource and EventSink handles #1267

Closed
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
22ead15
initial work (WIP)
cardenaso11 Jan 16, 2024
4182e5b
WIP checkpoint
cardenaso11 Feb 13, 2024
2585923
checkpoint apiserver
cardenaso11 Feb 13, 2024
75c56e4
WIP: stateChangeID into StateChanged, routed into disk-persistence ev…
cardenaso11 Feb 15, 2024
b0576d0
remove inaccurate fixme
cardenaso11 Feb 15, 2024
94d2995
checkpoint: lib build
cardenaso11 Feb 19, 2024
a0133ae
WIP: going back and forth on how to manage state in tests
cardenaso11 Feb 19, 2024
16890a2
eventid
cardenaso11 Feb 19, 2024
e4fe10e
remove newpersistence data type (in exchange for type synonym) stuff …
cardenaso11 Feb 19, 2024
d692b7d
checkpoint for testing
cardenaso11 Feb 19, 2024
2b22fe2
some tests
cardenaso11 Feb 20, 2024
b72c15d
checkpoint: fix all hydra-node tests besides golden
cardenaso11 Feb 21, 2024
e0be84a
Use eventPairFromPersistenceIncremental
cardenaso11 Mar 4, 2024
f8be89f
Use the original PersistenceIncremental in withAPIServer
ch1bo Mar 4, 2024
3679cf5
Only use eventPairFromPersistenceIncremental
ch1bo Mar 4, 2024
11da78f
remove NewPersistenceIncremental type, use eventSource + eventSinks d…
cardenaso11 Mar 4, 2024
02f3c46
Move EventSource and EventSink definitions into Hydra.Events
ch1bo Mar 4, 2024
1fde992
Add HasEventId class to ensure events can be identified
ch1bo Mar 4, 2024
0e59d98
Draft extension points in Hydra.Node.Run
ch1bo Mar 4, 2024
f0ce297
Rename getEvents' -> getEvents and putEvent' -> putEvent
ch1bo Mar 4, 2024
8a00c06
Restore usage of persistenceIncremental in Hydra.Network.Reliability
ch1bo Mar 4, 2024
dbd3be3
Draft a new way to wire HydraNode in Node.Run
ch1bo Mar 11, 2024
a058ff8
Add node-level tests for event source / sink handling
ch1bo Mar 11, 2024
68e9ab4
Unify Hydra.Node function signatures and refine NodeSpec
ch1bo Mar 11, 2024
e5c2eec
Merge loadStateEventSource into hydrate function
ch1bo Mar 11, 2024
46d83d4
Move Environment into dedicated module
ch1bo Mar 12, 2024
a71cb88
Generate compatible state change events when testing hydrate
ch1bo Mar 12, 2024
fa7d970
Add more tests on event ids being strictly monotonic
ch1bo Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
checkpoint: lib build
cardenaso11 authored and ch1bo committed Mar 11, 2024
commit 94d2995ef6a322d45d896c3ec47f054fb2edca66
78 changes: 46 additions & 32 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
@@ -129,8 +129,9 @@ onIdleChainInitTx ::
HeadSeed ->
HeadParameters ->
[OnChainId] ->
Word64 ->
Outcome tx
onIdleChainInitTx env newChainState headId headSeed headParameters participants
onIdleChainInitTx env newChainState headId headSeed headParameters participants nextStateChangeID
| configuredParties == initializedParties
&& party `member` initializedParties
&& configuredContestationPeriod == contestationPeriod
@@ -141,7 +142,7 @@ onIdleChainInitTx env newChainState headId headSeed headParameters participants
, chainState = newChainState
, headId
, headSeed
, stateChangeID = 0
, stateChangeID = nextStateChangeID
}
<> cause (ClientEffect $ ServerOutput.HeadIsInitializing{headId, parties})
| otherwise =
@@ -181,9 +182,10 @@ onInitialChainCommitTx ::
Party ->
-- | Committed UTxO
UTxOType tx ->
Word64 ->
Outcome tx
onInitialChainCommitTx st newChainState pt utxo =
newState CommittedUTxO{party = pt, committedUTxO = utxo, chainState = newChainState}
onInitialChainCommitTx st newChainState pt utxo nextStateChangeID =
newState CommittedUTxO{party = pt, committedUTxO = utxo, chainState = newChainState, stateChangeID = nextStateChangeID}
<> causes
( notifyClient
: [postCollectCom | canCollectCom]
@@ -232,9 +234,10 @@ onInitialChainAbortTx ::
ChainStateType tx ->
Committed tx ->
HeadId ->
Word64 ->
Outcome tx
onInitialChainAbortTx newChainState committed headId =
newState HeadAborted{chainState = newChainState}
onInitialChainAbortTx newChainState committed headId nextStateChangeID =
newState HeadAborted{chainState = newChainState, stateChangeID = nextStateChangeID}
<> cause (ClientEffect $ ServerOutput.HeadIsAborted{headId, utxo = fold committed})

-- | Observe a collectCom transaction. We initialize the 'OpenState' using the
@@ -247,9 +250,10 @@ onInitialChainCollectTx ::
InitialState tx ->
-- | New chain state
ChainStateType tx ->
Word64 ->
Outcome tx
onInitialChainCollectTx st newChainState =
newState HeadOpened{chainState = newChainState, initialUTxO = u0}
onInitialChainCollectTx st newChainState nextStateChangeID =
newState HeadOpened{chainState = newChainState, initialUTxO = u0, stateChangeID = nextStateChangeID}
<> cause (ClientEffect $ ServerOutput.HeadIsOpen{headId, utxo = u0})
where
u0 = fold committed
@@ -287,23 +291,24 @@ onOpenNetworkReqTx ::
TTL ->
-- | The transaction to be submitted to the head.
tx ->
Word64 ->
Outcome tx
onOpenNetworkReqTx env ledger st ttl tx =
onOpenNetworkReqTx env ledger st ttl tx nextStateChangeID =
-- Spec: Tall ← ̂Tall ∪ { (hash(tx), tx) }
(newState TransactionReceived{tx} <>) $
(newState TransactionReceived{tx, stateChangeID = nextStateChangeID} <>) $
-- Spec: wait L̂ ◦ tx ≠ ⊥ combined with L̂ ← L̂ ◦ tx
waitApplyTx $ \newLocalUTxO ->
-- Spec: if ŝ = s̄ ∧ leader(s̄ + 1) = i
( if not snapshotInFlight && isLeader parameters party nextSn
then
newState TransactionAppliedToLocalUTxO{tx = tx, newLocalUTxO}
newState TransactionAppliedToLocalUTxO{tx = tx, newLocalUTxO, stateChangeID = nextStateChangeID}
<>
-- XXX: This state update has no equivalence in the
-- spec. Do we really need to store that we have
-- requested a snapshot? If yes, should update spec.
newState SnapshotRequestDecided{snapshotNumber = nextSn}
newState SnapshotRequestDecided{snapshotNumber = nextSn, stateChangeID = nextStateChangeID}
<> cause (NetworkEffect $ ReqSn nextSn (txId <$> localTxs'))
else newState TransactionAppliedToLocalUTxO{tx, newLocalUTxO}
else newState TransactionAppliedToLocalUTxO{tx, newLocalUTxO, stateChangeID = nextStateChangeID}
)
<> cause (ClientEffect $ ServerOutput.TxValid headId tx)
where
@@ -369,8 +374,9 @@ onOpenNetworkReqSn ::
SnapshotNumber ->
-- | List of transactions to snapshot.
[TxIdType tx] ->
Word64 ->
Outcome tx
onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =
onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds nextStateChangeID =
-- TODO: Verify the request is signed by (?) / comes from the leader
-- (Can we prove a message comes from a given peer, without signature?)

@@ -399,6 +405,7 @@ onOpenNetworkReqSn env ledger st otherParty sn requestedTxIds =
, requestedTxIds
, newLocalUTxO
, newLocalTxs
, stateChangeID = nextStateChangeID
}
where
requireReqSn continue
@@ -480,8 +487,9 @@ onOpenNetworkAckSn ::
Signature (Snapshot tx) ->
-- | Snapshot number of this AckSn.
SnapshotNumber ->
Word64 ->
Outcome tx
onOpenNetworkAckSn Environment{party} openState otherParty snapshotSignature sn =
onOpenNetworkAckSn Environment{party} openState otherParty snapshotSignature sn nextStateChangeID =
-- TODO: verify authenticity of message and whether otherParty is part of the head
-- Spec: require s ∈ {ŝ, ŝ + 1}
requireValidAckSn $ do
@@ -494,7 +502,7 @@ onOpenNetworkAckSn Environment{party} openState otherParty snapshotSignature sn
let multisig = aggregateInOrder sigs' parties
requireVerifiedMultisignature multisig snapshot $
do
newState SnapshotConfirmed{snapshot, signatures = multisig}
newState SnapshotConfirmed{snapshot, signatures = multisig, stateChangeID = nextStateChangeID}
<> cause (ClientEffect $ ServerOutput.SnapshotConfirmed headId snapshot multisig)
& maybeEmitSnapshot
where
@@ -526,6 +534,7 @@ onOpenNetworkAckSn Environment{party} openState otherParty snapshotSignature sn
{ snapshot
, party = otherParty
, signature = snapshotSignature
, stateChangeID = nextStateChangeID
}

requireVerifiedMultisignature multisig msg cont =
@@ -544,7 +553,7 @@ onOpenNetworkAckSn Environment{party} openState otherParty snapshotSignature sn
if isLeader parameters party nextSn && not (null localTxs)
then
outcome
<> newState SnapshotRequestDecided{snapshotNumber = nextSn}
<> newState SnapshotRequestDecided{snapshotNumber = nextSn, stateChangeID = nextStateChangeID}
<> cause (NetworkEffect $ ReqSn nextSn (txId <$> localTxs))
else outcome

@@ -589,9 +598,10 @@ onOpenChainCloseTx ::
SnapshotNumber ->
-- | Contestation deadline.
UTCTime ->
Word64 ->
Outcome tx
onOpenChainCloseTx openState newChainState closedSnapshotNumber contestationDeadline =
newState HeadClosed{chainState = newChainState, contestationDeadline}
onOpenChainCloseTx openState newChainState closedSnapshotNumber contestationDeadline nextStateChangeID =
newState HeadClosed{chainState = newChainState, contestationDeadline, stateChangeID = nextStateChangeID}
<> causes
( notifyClient
: [ OnChainEffect
@@ -674,9 +684,10 @@ onClosedChainFanoutTx ::
ClosedState tx ->
-- | New chain state
ChainStateType tx ->
Word64 ->
Outcome tx
onClosedChainFanoutTx closedState newChainState =
newState HeadFannedOut{chainState = newChainState}
onClosedChainFanoutTx closedState newChainState nextStateChangeID =
newState HeadFannedOut{chainState = newChainState, stateChangeID = nextStateChangeID}
<> cause (ClientEffect $ ServerOutput.HeadIsFinalized{headId, utxo})
where
Snapshot{utxo} = getSnapshot confirmedSnapshot
@@ -700,17 +711,17 @@ update env ledger nextStateChangeID st ev = case (st, ev) of
(Idle _, ClientInput Init) ->
onIdleClientInit env
(Idle _, ChainInput Observation{observedTx = OnInitTx{headId, headSeed, headParameters, participants}, newChainState}) ->
onIdleChainInitTx env newChainState headId headSeed headParameters participants
onIdleChainInitTx env newChainState headId headSeed headParameters participants nextStateChangeID
(Initial initialState@InitialState{headId = ourHeadId}, ChainInput Observation{observedTx = OnCommitTx{headId, party = pt, committed = utxo}, newChainState})
| ourHeadId == headId -> onInitialChainCommitTx initialState newChainState pt utxo
| ourHeadId == headId -> onInitialChainCommitTx initialState newChainState pt utxo nextStateChangeID
| otherwise -> Error NotOurHead{ourHeadId, otherHeadId = headId}
(Initial initialState, ClientInput Abort) ->
onInitialClientAbort initialState
(Initial initialState@InitialState{headId = ourHeadId}, ChainInput Observation{observedTx = OnCollectComTx{headId}, newChainState})
| ourHeadId == headId -> onInitialChainCollectTx initialState newChainState
| ourHeadId == headId -> onInitialChainCollectTx initialState newChainState nextStateChangeID
| otherwise -> Error NotOurHead{ourHeadId, otherHeadId = headId}
(Initial InitialState{headId = ourHeadId, committed}, ChainInput Observation{observedTx = OnAbortTx{headId}, newChainState})
| ourHeadId == headId -> onInitialChainAbortTx newChainState committed headId
| ourHeadId == headId -> onInitialChainAbortTx newChainState committed headId nextStateChangeID
| otherwise -> Error NotOurHead{ourHeadId, otherHeadId = headId}
(Initial InitialState{committed, headId}, ClientInput GetUTxO) ->
cause (ClientEffect . ServerOutput.GetUTxOResponse headId $ fold committed)
@@ -720,18 +731,18 @@ update env ledger nextStateChangeID st ev = case (st, ev) of
(Open{}, ClientInput (NewTx tx)) ->
onOpenClientNewTx tx
(Open openState, NetworkInput ttl _ (ReqTx tx)) ->
onOpenNetworkReqTx env ledger openState ttl tx
onOpenNetworkReqTx env ledger openState ttl tx nextStateChangeID
(Open openState, NetworkInput _ otherParty (ReqSn sn txIds)) ->
-- XXX: ttl == 0 not handled for ReqSn
onOpenNetworkReqSn env ledger openState otherParty sn txIds
onOpenNetworkReqSn env ledger openState otherParty sn txIds nextStateChangeID
(Open openState, NetworkInput _ otherParty (AckSn snapshotSignature sn)) ->
-- XXX: ttl == 0 not handled for AckSn
onOpenNetworkAckSn env openState otherParty snapshotSignature sn
onOpenNetworkAckSn env openState otherParty snapshotSignature sn nextStateChangeID
( Open openState@OpenState{headId = ourHeadId}
, ChainInput Observation{observedTx = OnCloseTx{headId, snapshotNumber = closedSnapshotNumber, contestationDeadline}, newChainState}
)
| ourHeadId == headId ->
onOpenChainCloseTx openState newChainState closedSnapshotNumber contestationDeadline
onOpenChainCloseTx openState newChainState closedSnapshotNumber contestationDeadline nextStateChangeID
| otherwise ->
Error NotOurHead{ourHeadId, otherHeadId = headId}
(Open OpenState{coordinatedHeadState = CoordinatedHeadState{confirmedSnapshot}, headId}, ClientInput GetUTxO) ->
@@ -756,20 +767,23 @@ update env ledger nextStateChangeID st ev = case (st, ev) of
onClosedClientFanout closedState
(Closed closedState@ClosedState{headId = ourHeadId}, ChainInput Observation{observedTx = OnFanoutTx{headId}, newChainState})
| ourHeadId == headId ->
onClosedChainFanoutTx closedState newChainState
onClosedChainFanoutTx closedState newChainState nextStateChangeID
| otherwise ->
Error NotOurHead{ourHeadId, otherHeadId = headId}
-- General
(_, ChainInput Rollback{rolledBackChainState}) ->
newState ChainRolledBack{chainState = rolledBackChainState}
newState ChainRolledBack{chainState = rolledBackChainState, stateChangeID}
(_, ChainInput Tick{chainSlot}) ->
newState TickObserved{chainSlot}
newState TickObserved{chainSlot, stateChangeID}
-- FIXME(Elaine): should this bump the stateChangeID as well?
(_, ChainInput PostTxError{postChainTx, postTxError}) ->
cause . ClientEffect $ ServerOutput.PostTxOnChainFailed{postChainTx, postTxError}
(_, ClientInput{clientInput}) ->
cause . ClientEffect $ ServerOutput.CommandFailed clientInput st
_ ->
Error $ UnhandledInput ev st
where
stateChangeID = nextStateChangeID

-- * HeadState aggregate

2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
@@ -241,7 +241,7 @@ processNextInput HydraNode{nodeState, ledger, env} e nextStateChangeID =
computeOutcome = Logic.update env ledger nextStateChangeID

processNextStateChange ::
forall m e tx.
forall m tx.
(Monad m, MonadSTM m, ToJSON (StateChanged tx)) =>
HydraNode tx m ->
NonEmpty (EventSink (StateChanged tx) m) ->
1 change: 0 additions & 1 deletion hydra-node/src/Hydra/Node/Run.hs
Original file line number Diff line number Diff line change
@@ -90,7 +90,6 @@ run opts = do
-- let -- (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence
-- eventSinks = eventSink :| [] --FIXME(Elaine): load other event sinks
-- eventSinksSansSource = [] --TODO(Elaine): this needs a better name. essentially, don't load events back into where they came from, at least until disk-based persistence can handle redelivery
let eventSinksSansSource = undefined
persistence@NewPersistenceIncremental{eventSource, eventSinks} <- createNewPersistenceIncremental $ persistenceDir <> "/state"

(hs, chainStateHistory) <- loadStateEventSource (contramap Node tracer) eventSource (NE.toList eventSinks) initialChainState
1 change: 1 addition & 0 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
@@ -167,6 +167,7 @@ createNewPersistenceIncremental fp = do
eventSinks = eventSink :| []
pure NewPersistenceIncremental{eventSource, eventSinks, lastStateChangeId}


-- | Initialize persistence handle for given type 'a' at given file path.
--
-- This instance of `PersistenceIncremental` is "thread-safe" in the sense that