Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Etcd observability #1884

Merged
merged 3 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions hydra-node/src/Hydra/Network/Etcd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
-- using a GRPC client. We can only write and read from the cluster while
-- connected to the majority cluster.
--
-- Broadcasting is implemented using @put@ to some well-known key, while
-- message delivery is done by using @watch@ on the same key. We keep a last
-- known revision, also stored on disk, to start 'watch' with that revision (+1)
-- and only deliver messages that were not seen before. In case we are not
-- connected to our 'etcd' instance or not enough peers (= on a minority
-- cluster), we retry sending, but also store messages to broadcast in a
-- 'PersistentQueue', which makes the node resilient against crashes while
-- sending. TODO: Is this needed? performance limitation?
-- Broadcasting is implemented using @put@ to some well-known key, while message
-- delivery is done by using @watch@ on the same 'msg' prefix. We keep a last known
-- revision, also stored on disk, to start 'watch' with that revision (+1) and
-- only deliver messages that were not seen before. In case we are not connected
-- to our 'etcd' instance or not enough peers (= on a minority cluster), we
-- retry sending, but also store messages to broadcast in a 'PersistentQueue',
-- which makes the node resilient against crashes while sending. TODO: Is this
-- needed? performance limitation?
--
-- Connectivity and compatibility with other nodes on the cluster is tracked
-- using the key-value service as well:
--
-- * network connectivity is determined by being able to fetch the member list
-- * peer connectivity is tracked (best effort, not authorized) using an entry
-- at 'alive-\<node id\>' keys with individual leases and repeated keep-alives
-- at 'alive-\<advertise\>' keys with individual leases and repeated keep-alives
-- * each node compare-and-swaps its `version` into a key of the same name to
-- check compatibility (not updatable)
--
Expand Down Expand Up @@ -73,7 +73,6 @@ import Hydra.Network (
NetworkCallback (..),
NetworkComponent,
NetworkConfiguration (..),
PortNumber,
hydraVersionedProtocolNumber,
)
import Network.GRPC.Client (
Expand Down Expand Up @@ -134,7 +133,7 @@ withEtcdNetwork tracer protocolVersion config callback action = do
race_ (pollConnectivity tracer conn advertise callback) $ do
race_ (waitMessages tracer conn protocolVersion persistenceDir callback) $ do
queue <- newPersistentQueue (persistenceDir </> "pending-broadcast") 100
race_ (broadcastMessages tracer conn protocolVersion (port listen) queue) $ do
race_ (broadcastMessages tracer conn protocolVersion advertise queue) $ do
action
Network
{ broadcast = writePersistentQueue queue
Expand Down Expand Up @@ -227,22 +226,23 @@ matchVersion ::
Maybe HydraHandshakeRefused
matchVersion key ourVersion = do
case splitOn "-" $ decodeUtf8 key of
[_prefix, versionText, port] ->
[_prefix, versionText, hostText] -> do
let remoteHost = fromMaybe (Host "???" 0) . readMaybe $ toString hostText
case parseVersion versionText of
Just theirVersion
| ourVersion == theirVersion -> Nothing
| otherwise ->
-- TODO: DRY just cases
Just
HydraHandshakeRefused
{ remoteHost = Host "???" $ fromMaybe 0 $ parsePort port
{ remoteHost
, ourVersion
, theirVersions = KnownHydraVersions [theirVersion]
}
Nothing ->
Just
HydraHandshakeRefused
{ remoteHost = Host "???" $ fromMaybe 0 $ parsePort port
{ remoteHost
, ourVersion
, theirVersions = NoKnownHydraVersions
}
Expand All @@ -256,8 +256,6 @@ matchVersion key ourVersion = do
where
parseVersion = fmap MkHydraVersionedProtocolNumber . readMaybe . toString

parsePort = readMaybe . toString

-- | Broadcast messages from a queue to the etcd cluster.
--
-- TODO: retrying on failure even needed?
Expand All @@ -267,13 +265,14 @@ broadcastMessages ::
Tracer IO EtcdLog ->
Connection ->
HydraVersionedProtocolNumber ->
PortNumber ->
-- | Used to identify sender.
Host ->
PersistentQueue IO msg ->
IO ()
broadcastMessages tracer conn protocolVersion port queue =
broadcastMessages tracer conn protocolVersion ourHost queue =
withGrpcContext "broadcastMessages" . forever $ do
msg <- peekPersistentQueue queue
(putMessage conn protocolVersion port msg >> popPersistentQueue queue msg)
(putMessage conn protocolVersion ourHost msg >> popPersistentQueue queue msg)
`catch` \case
GrpcException{grpcError, grpcErrorMessage}
| grpcError == GrpcUnavailable || grpcError == GrpcDeadlineExceeded -> do
Expand All @@ -286,10 +285,11 @@ putMessage ::
ToCBOR msg =>
Connection ->
HydraVersionedProtocolNumber ->
PortNumber ->
-- | Used to identify sender.
Host ->
msg ->
IO ()
putMessage conn protocolVersion port msg =
putMessage conn protocolVersion ourHost msg =
void $ nonStreaming conn (rpc @(Protobuf KV "put")) req
where
req =
Expand All @@ -298,7 +298,7 @@ putMessage conn protocolVersion port msg =
& #value .~ serialize' msg

-- TODO: use one key again (after mapping version check)?
key = encodeUtf8 @Text $ "msg-" <> show (hydraVersionedProtocolNumber protocolVersion) <> "-" <> show port
key = encodeUtf8 @Text $ "msg-" <> show (hydraVersionedProtocolNumber protocolVersion) <> "-" <> show ourHost

-- | Fetch and wait for messages from the etcd cluster.
waitMessages ::
Expand Down Expand Up @@ -386,45 +386,53 @@ pollConnectivity tracer conn advertise NetworkCallback{onConnectivity} = do
onConnectivity NetworkConnected
-- Write our alive key using lease
writeAlive leaseId
traceWith tracer CreatedLease{leaseId}
withKeepAlive leaseId $ \keepAlive ->
forever $ do
-- Keep our lease alive
keepAlive
ttlRemaining <- keepAlive
when (ttlRemaining < 1) $
traceWith tracer LowLeaseTTL{ttlRemaining}
-- Determine alive peers
alive <- getAlive
traceWith tracer CurrentlyAlive{alive}
let othersAlive = alive \\ [advertise]
seenAlive <- atomically $ swapTVar seenAliveVar othersAlive
forM_ (othersAlive \\ seenAlive) $ onConnectivity . PeerConnected
forM_ (seenAlive \\ othersAlive) $ onConnectivity . PeerDisconnected
threadDelay 1
-- Wait roughly ttl / 2
threadDelay (ttlRemaining / 2)
where
ttl = 3

onGrpcException seenAliveVar GrpcException{grpcError}
| grpcError == GrpcUnavailable || grpcError == GrpcDeadlineExceeded = do
onConnectivity NetworkDisconnected
atomically $ writeTVar seenAliveVar []
threadDelay 1
onGrpcException _ e = throwIO e

-- REVIEW: server can decide ttl?
createLease = withGrpcContext "createLease" $ do
leaseResponse <-
nonStreaming conn (rpc @(Protobuf Lease "leaseGrant")) $
defMessage & #ttl .~ ttl
defMessage & #ttl .~ 3
pure $ leaseResponse ^. #id

withKeepAlive leaseId action = do
biDiStreaming conn (rpc @(Protobuf Lease "leaseKeepAlive")) $ \send recv -> do
void . action $ do
send $ NextElem $ defMessage & #id .~ leaseId
recv >>= \case
NextElem res -> pure . fromIntegral $ res ^. #ttl
NoNextElem -> do
traceWith tracer NoKeepAliveResponse
pure 0

writeAlive leaseId = withGrpcContext "writeAlive" $ do
void . nonStreaming conn (rpc @(Protobuf KV "put")) $
defMessage
& #key .~ "alive-" <> show advertise
& #value .~ serialize' advertise
& #lease .~ leaseId

withKeepAlive leaseId action = do
biDiStreaming conn (rpc @(Protobuf Lease "leaseKeepAlive")) $ \send _recv ->
void . action $ send $ NextElem (defMessage & #id .~ leaseId)

getAlive = withGrpcContext "getAlive" $ do
res <-
nonStreaming conn (rpc @(Protobuf KV "range")) $
Expand Down Expand Up @@ -550,6 +558,10 @@ data EtcdLog
| BroadcastFailed {reason :: Text}
| FailedToDecodeLog {log :: Text, reason :: Text}
| FailedToDecodeValue {key :: Text, value :: Text, reason :: Text}
| CreatedLease {leaseId :: Int64}
| LowLeaseTTL {ttlRemaining :: DiffTime}
| CurrentlyAlive {alive :: [Host]}
| NoKeepAliveResponse
deriving stock (Eq, Show, Generic)
deriving anyclass (ToJSON)

Expand Down
6 changes: 3 additions & 3 deletions hydra-node/test/Hydra/NetworkSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,17 @@ spec = do

it "checks protocol version" $ \tracer -> do
withTempDir "test-etcd" $ \tmp -> do
failAfter 5 $ do
failAfter 10 $ do
PeerConfig2{aliceConfig, bobConfig} <- setup2Peers tmp
let v2 = MkHydraVersionedProtocolNumber 2
withEtcdNetwork @Int tracer v1 aliceConfig noopCallback $ \n1 -> do
(recordReceived, _, waitConnectivity) <- newRecordingCallback
withEtcdNetwork @Int tracer v2 bobConfig recordReceived $ \_n2 -> do
broadcast n1 123

waitEq waitConnectivity 10 $
waitEq waitConnectivity 5 $
HandshakeFailure
{ remoteHost = Host "???" aliceConfig.advertise.port
{ remoteHost = aliceConfig.advertise
, ourVersion = v2
, theirVersions = KnownHydraVersions [v1]
}
Expand Down