@@ -21,6 +21,7 @@ import Hydra.Cardano.Api (
21
21
SocketPath ,
22
22
Tx ,
23
23
UTxO ,
24
+ chainTipToChainPoint ,
24
25
connectToLocalNode ,
25
26
getTxBody ,
26
27
getTxId ,
@@ -52,8 +53,13 @@ import Ouroboros.Network.Protocol.ChainSync.Client (
52
53
ClientStNext (.. ),
53
54
)
54
55
55
- main :: IO ()
56
- main = do
56
+ type ObserverHandler m = [HeadObservation ] -> m ()
57
+
58
+ defaultObserverHandler :: Applicative m => ObserverHandler m
59
+ defaultObserverHandler = const $ pure ()
60
+
61
+ main :: ObserverHandler IO -> IO ()
62
+ main observerHandler = do
57
63
Options {networkId, nodeSocket, startChainFrom} <- execParser hydraChainObserverOptions
58
64
withTracer (Verbose " hydra-chain-observer" ) $ \ tracer -> do
59
65
traceWith tracer KnownScripts {scriptInfo = Contract. scriptInfo}
@@ -64,7 +70,7 @@ main = do
64
70
traceWith tracer StartObservingFrom {chainPoint}
65
71
connectToLocalNode
66
72
(connectInfo nodeSocket networkId)
67
- (clientProtocols tracer networkId chainPoint)
73
+ (clientProtocols tracer networkId chainPoint observerHandler )
68
74
69
75
type ChainObserverLog :: Type
70
76
data ChainObserverLog
@@ -79,7 +85,7 @@ data ChainObserverLog
79
85
| HeadAbortTx { headId :: HeadId }
80
86
| HeadContestTx { headId :: HeadId }
81
87
| Rollback { point :: ChainPoint }
82
- | RollForward { receivedTxIds :: [TxId ]}
88
+ | RollForward { point :: ChainPoint , receivedTxIds :: [TxId ]}
83
89
deriving stock (Eq , Show , Generic )
84
90
deriving anyclass (ToJSON )
85
91
@@ -101,10 +107,11 @@ clientProtocols ::
101
107
Tracer IO ChainObserverLog ->
102
108
NetworkId ->
103
109
ChainPoint ->
110
+ ObserverHandler IO ->
104
111
LocalNodeClientProtocols BlockType ChainPoint ChainTip slot tx txid txerr query IO
105
- clientProtocols tracer networkId startingPoint =
112
+ clientProtocols tracer networkId startingPoint observerHandler =
106
113
LocalNodeClientProtocols
107
- { localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint
114
+ { localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint observerHandler
108
115
, localTxSubmissionClient = Nothing
109
116
, localStateQueryClient = Nothing
110
117
, localTxMonitoringClient = Nothing
@@ -128,8 +135,9 @@ chainSyncClient ::
128
135
Tracer m ChainObserverLog ->
129
136
NetworkId ->
130
137
ChainPoint ->
138
+ ObserverHandler m ->
131
139
ChainSyncClient BlockType ChainPoint ChainTip m ()
132
- chainSyncClient tracer networkId startingPoint =
140
+ chainSyncClient tracer networkId startingPoint observerHandler =
133
141
ChainSyncClient $
134
142
pure $
135
143
SendMsgFindIntersect [startingPoint] clientStIntersect
@@ -143,44 +151,53 @@ chainSyncClient tracer networkId startingPoint =
143
151
ChainSyncClient $ throwIO (IntersectionNotFound startingPoint)
144
152
}
145
153
146
- clientStIdle :: UTxO -> ClientStIdle BlockType ChainPoint tip m ()
154
+ clientStIdle :: UTxO -> ClientStIdle BlockType ChainPoint ChainTip m ()
147
155
clientStIdle utxo = SendMsgRequestNext (clientStNext utxo) (pure $ clientStNext utxo)
148
156
149
- clientStNext :: UTxO -> ClientStNext BlockType ChainPoint tip m ()
157
+ clientStNext :: UTxO -> ClientStNext BlockType ChainPoint ChainTip m ()
150
158
clientStNext utxo =
151
159
ClientStNext
152
- { recvMsgRollForward = \ blockInMode _tip -> ChainSyncClient $ do
160
+ { recvMsgRollForward = \ blockInMode tip -> ChainSyncClient $ do
153
161
case blockInMode of
154
162
BlockInMode _ (Block _header txs) BabbageEraInCardanoMode -> do
155
- traceWith tracer RollForward {receivedTxIds = getTxId . getTxBody <$> txs}
156
- let (utxo', logs) = observeAll networkId utxo txs
157
- forM_ logs (traceWith tracer)
163
+ let point = chainTipToChainPoint tip
164
+ let receivedTxIds = getTxId . getTxBody <$> txs
165
+ traceWith tracer RollForward {point, receivedTxIds}
166
+ let (utxo', observations) = observeAll networkId utxo txs
167
+ -- FIXME we should be exposing OnChainTx instead of working around NoHeadTx.
168
+ forM_ observations (maybe (pure () ) (traceWith tracer) . logObservation)
169
+ observerHandler observations
158
170
pure $ clientStIdle utxo'
159
171
_ -> pure $ clientStIdle utxo
160
172
, recvMsgRollBackward = \ point _tip -> ChainSyncClient $ do
161
173
traceWith tracer Rollback {point}
162
174
pure $ clientStIdle utxo
163
175
}
164
176
165
- observeTx :: NetworkId -> UTxO -> Tx -> (UTxO , Maybe ChainObserverLog )
177
+ logObservation :: HeadObservation -> Maybe ChainObserverLog
178
+ logObservation = \ case
179
+ NoHeadTx -> Nothing
180
+ Init InitObservation {headId} -> pure $ HeadInitTx {headId}
181
+ Commit CommitObservation {headId} -> pure $ HeadCommitTx {headId}
182
+ CollectCom CollectComObservation {headId} -> pure $ HeadCollectComTx {headId}
183
+ Close CloseObservation {headId} -> pure $ HeadCloseTx {headId}
184
+ Fanout FanoutObservation {headId} -> pure $ HeadFanoutTx {headId}
185
+ Abort AbortObservation {headId} -> pure $ HeadAbortTx {headId}
186
+ Contest ContestObservation {headId} -> pure $ HeadContestTx {headId}
187
+
188
+ observeTx :: NetworkId -> UTxO -> Tx -> (UTxO , Maybe HeadObservation )
166
189
observeTx networkId utxo tx =
167
190
let utxo' = adjustUTxO tx utxo
168
191
in case observeHeadTx networkId utxo tx of
169
192
NoHeadTx -> (utxo, Nothing )
170
- Init InitObservation {headId} -> (utxo', pure $ HeadInitTx {headId})
171
- Commit CommitObservation {headId} -> (utxo', pure $ HeadCommitTx {headId})
172
- CollectCom CollectComObservation {headId} -> (utxo', pure $ HeadCollectComTx {headId})
173
- Close CloseObservation {headId} -> (utxo', pure $ HeadCloseTx {headId})
174
- Fanout FanoutObservation {headId} -> (utxo', pure $ HeadFanoutTx {headId})
175
- Abort AbortObservation {headId} -> (utxo', pure $ HeadAbortTx {headId})
176
- Contest ContestObservation {headId} -> (utxo', pure $ HeadContestTx {headId})
177
-
178
- observeAll :: NetworkId -> UTxO -> [Tx ] -> (UTxO , [ChainObserverLog ])
193
+ observation -> (utxo', pure observation)
194
+
195
+ observeAll :: NetworkId -> UTxO -> [Tx ] -> (UTxO , [HeadObservation ])
179
196
observeAll networkId utxo txs =
180
197
second reverse $ foldr go (utxo, [] ) txs
181
198
where
182
- go :: Tx -> (UTxO , [ChainObserverLog ]) -> (UTxO , [ChainObserverLog ])
183
- go tx (utxo'', logs ) =
199
+ go :: Tx -> (UTxO , [HeadObservation ]) -> (UTxO , [HeadObservation ])
200
+ go tx (utxo'', observations ) =
184
201
case observeTx networkId utxo'' tx of
185
- (utxo', Nothing ) -> (utxo', logs )
186
- (utxo', Just logEntry ) -> (utxo', logEntry : logs )
202
+ (utxo', Nothing ) -> (utxo', observations )
203
+ (utxo', Just observation ) -> (utxo', observation : observations )
0 commit comments