15
15
-- using a GRPC client. We can only write and read from the cluster while
16
16
-- connected to the majority cluster.
17
17
--
18
- -- Broadcasting is implemented using @put@ to some well-known key, while
19
- -- message delivery is done by using @watch@ on the same key . We keep a last
20
- -- known revision, also stored on disk, to start 'watch' with that revision (+1)
21
- -- and only deliver messages that were not seen before. In case we are not
22
- -- connected to our 'etcd' instance or not enough peers (= on a minority
23
- -- cluster), we retry sending, but also store messages to broadcast in a
24
- -- 'PersistentQueue', which makes the node resilient against crashes while
25
- -- sending. TODO: Is this needed? performance limitation?
18
+ -- Broadcasting is implemented using @put@ to some well-known key, while message
19
+ -- delivery is done by using @watch@ on the same 'msg' prefix . We keep a last known
20
+ -- revision, also stored on disk, to start 'watch' with that revision (+1) and
21
+ -- only deliver messages that were not seen before. In case we are not connected
22
+ -- to our 'etcd' instance or not enough peers (= on a minority cluster), we
23
+ -- retry sending, but also store messages to broadcast in a 'PersistentQueue',
24
+ -- which makes the node resilient against crashes while sending. TODO: Is this
25
+ -- needed? performance limitation?
26
26
--
27
27
-- Connectivity and compatibility with other nodes on the cluster is tracked
28
28
-- using the key-value service as well:
29
29
--
30
30
-- * network connectivity is determined by being able to fetch the member list
31
31
-- * peer connectivity is tracked (best effort, not authorized) using an entry
32
- -- at 'alive-\<node id \>' keys with individual leases and repeated keep-alives
32
+ -- at 'alive-\<advertise \>' keys with individual leases and repeated keep-alives
33
33
-- * each node compare-and-swaps its `version` into a key of the same name to
34
34
-- check compatibility (not updatable)
35
35
--
@@ -73,7 +73,6 @@ import Hydra.Network (
73
73
NetworkCallback (.. ),
74
74
NetworkComponent ,
75
75
NetworkConfiguration (.. ),
76
- PortNumber ,
77
76
hydraVersionedProtocolNumber ,
78
77
)
79
78
import Network.GRPC.Client (
@@ -134,7 +133,7 @@ withEtcdNetwork tracer protocolVersion config callback action = do
134
133
race_ (pollConnectivity tracer conn advertise callback) $ do
135
134
race_ (waitMessages tracer conn protocolVersion persistenceDir callback) $ do
136
135
queue <- newPersistentQueue (persistenceDir </> " pending-broadcast" ) 100
137
- race_ (broadcastMessages tracer conn protocolVersion (port listen) queue) $ do
136
+ race_ (broadcastMessages tracer conn protocolVersion advertise queue) $ do
138
137
action
139
138
Network
140
139
{ broadcast = writePersistentQueue queue
@@ -227,22 +226,23 @@ matchVersion ::
227
226
Maybe HydraHandshakeRefused
228
227
matchVersion key ourVersion = do
229
228
case splitOn " -" $ decodeUtf8 key of
230
- [_prefix, versionText, port] ->
229
+ [_prefix, versionText, hostText] -> do
230
+ let remoteHost = fromMaybe (Host " ???" 0 ) . readMaybe $ toString hostText
231
231
case parseVersion versionText of
232
232
Just theirVersion
233
233
| ourVersion == theirVersion -> Nothing
234
234
| otherwise ->
235
235
-- TODO: DRY just cases
236
236
Just
237
237
HydraHandshakeRefused
238
- { remoteHost = Host " ??? " $ fromMaybe 0 $ parsePort port
238
+ { remoteHost
239
239
, ourVersion
240
240
, theirVersions = KnownHydraVersions [theirVersion]
241
241
}
242
242
Nothing ->
243
243
Just
244
244
HydraHandshakeRefused
245
- { remoteHost = Host " ??? " $ fromMaybe 0 $ parsePort port
245
+ { remoteHost
246
246
, ourVersion
247
247
, theirVersions = NoKnownHydraVersions
248
248
}
@@ -256,8 +256,6 @@ matchVersion key ourVersion = do
256
256
where
257
257
parseVersion = fmap MkHydraVersionedProtocolNumber . readMaybe . toString
258
258
259
- parsePort = readMaybe . toString
260
-
261
259
-- | Broadcast messages from a queue to the etcd cluster.
262
260
--
263
261
-- TODO: retrying on failure even needed?
@@ -267,13 +265,14 @@ broadcastMessages ::
267
265
Tracer IO EtcdLog ->
268
266
Connection ->
269
267
HydraVersionedProtocolNumber ->
270
- PortNumber ->
268
+ -- | Used to identify sender.
269
+ Host ->
271
270
PersistentQueue IO msg ->
272
271
IO ()
273
- broadcastMessages tracer conn protocolVersion port queue =
272
+ broadcastMessages tracer conn protocolVersion ourHost queue =
274
273
withGrpcContext " broadcastMessages" . forever $ do
275
274
msg <- peekPersistentQueue queue
276
- (putMessage conn protocolVersion port msg >> popPersistentQueue queue msg)
275
+ (putMessage conn protocolVersion ourHost msg >> popPersistentQueue queue msg)
277
276
`catch` \ case
278
277
GrpcException {grpcError, grpcErrorMessage}
279
278
| grpcError == GrpcUnavailable || grpcError == GrpcDeadlineExceeded -> do
@@ -286,10 +285,11 @@ putMessage ::
286
285
ToCBOR msg =>
287
286
Connection ->
288
287
HydraVersionedProtocolNumber ->
289
- PortNumber ->
288
+ -- | Used to identify sender.
289
+ Host ->
290
290
msg ->
291
291
IO ()
292
- putMessage conn protocolVersion port msg =
292
+ putMessage conn protocolVersion ourHost msg =
293
293
void $ nonStreaming conn (rpc @ (Protobuf KV " put" )) req
294
294
where
295
295
req =
@@ -298,7 +298,7 @@ putMessage conn protocolVersion port msg =
298
298
& # value .~ serialize' msg
299
299
300
300
-- TODO: use one key again (after mapping version check)?
301
- key = encodeUtf8 @ Text $ " msg-" <> show (hydraVersionedProtocolNumber protocolVersion) <> " -" <> show port
301
+ key = encodeUtf8 @ Text $ " msg-" <> show (hydraVersionedProtocolNumber protocolVersion) <> " -" <> show ourHost
302
302
303
303
-- | Fetch and wait for messages from the etcd cluster.
304
304
waitMessages ::
@@ -386,45 +386,53 @@ pollConnectivity tracer conn advertise NetworkCallback{onConnectivity} = do
386
386
onConnectivity NetworkConnected
387
387
-- Write our alive key using lease
388
388
writeAlive leaseId
389
+ traceWith tracer CreatedLease {leaseId}
389
390
withKeepAlive leaseId $ \ keepAlive ->
390
391
forever $ do
391
392
-- Keep our lease alive
392
- keepAlive
393
+ ttlRemaining <- keepAlive
394
+ when (ttlRemaining < 1 ) $
395
+ traceWith tracer LowLeaseTTL {ttlRemaining}
393
396
-- Determine alive peers
394
397
alive <- getAlive
398
+ traceWith tracer CurrentlyAlive {alive}
395
399
let othersAlive = alive \\ [advertise]
396
400
seenAlive <- atomically $ swapTVar seenAliveVar othersAlive
397
401
forM_ (othersAlive \\ seenAlive) $ onConnectivity . PeerConnected
398
402
forM_ (seenAlive \\ othersAlive) $ onConnectivity . PeerDisconnected
399
- threadDelay 1
403
+ -- Wait roughly ttl / 2
404
+ threadDelay (ttlRemaining / 2 )
400
405
where
401
- ttl = 3
402
-
403
406
onGrpcException seenAliveVar GrpcException {grpcError}
404
407
| grpcError == GrpcUnavailable || grpcError == GrpcDeadlineExceeded = do
405
408
onConnectivity NetworkDisconnected
406
409
atomically $ writeTVar seenAliveVar []
407
410
threadDelay 1
408
411
onGrpcException _ e = throwIO e
409
412
410
- -- REVIEW: server can decide ttl?
411
413
createLease = withGrpcContext " createLease" $ do
412
414
leaseResponse <-
413
415
nonStreaming conn (rpc @ (Protobuf Lease " leaseGrant" )) $
414
- defMessage & # ttl .~ ttl
416
+ defMessage & # ttl .~ 3
415
417
pure $ leaseResponse ^. # id
416
418
419
+ withKeepAlive leaseId action = do
420
+ biDiStreaming conn (rpc @ (Protobuf Lease " leaseKeepAlive" )) $ \ send recv -> do
421
+ void . action $ do
422
+ send $ NextElem $ defMessage & # id .~ leaseId
423
+ recv >>= \ case
424
+ NextElem res -> pure . fromIntegral $ res ^. # ttl
425
+ NoNextElem -> do
426
+ traceWith tracer NoKeepAliveResponse
427
+ pure 0
428
+
417
429
writeAlive leaseId = withGrpcContext " writeAlive" $ do
418
430
void . nonStreaming conn (rpc @ (Protobuf KV " put" )) $
419
431
defMessage
420
432
& # key .~ " alive-" <> show advertise
421
433
& # value .~ serialize' advertise
422
434
& # lease .~ leaseId
423
435
424
- withKeepAlive leaseId action = do
425
- biDiStreaming conn (rpc @ (Protobuf Lease " leaseKeepAlive" )) $ \ send _recv ->
426
- void . action $ send $ NextElem (defMessage & # id .~ leaseId)
427
-
428
436
getAlive = withGrpcContext " getAlive" $ do
429
437
res <-
430
438
nonStreaming conn (rpc @ (Protobuf KV " range" )) $
@@ -550,6 +558,10 @@ data EtcdLog
550
558
| BroadcastFailed { reason :: Text }
551
559
| FailedToDecodeLog { log :: Text , reason :: Text }
552
560
| FailedToDecodeValue { key :: Text , value :: Text , reason :: Text }
561
+ | CreatedLease { leaseId :: Int64 }
562
+ | LowLeaseTTL { ttlRemaining :: DiffTime }
563
+ | CurrentlyAlive { alive :: [Host ]}
564
+ | NoKeepAliveResponse
553
565
deriving stock (Eq , Show , Generic )
554
566
deriving anyclass (ToJSON )
555
567
0 commit comments