-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EventSource and EventSink abstractions for Hydra Extensibility #1351
EventSource and EventSink abstractions for Hydra Extensibility #1351
Conversation
34087c0
to
9ac3f77
Compare
5b3f2a4
to
255f843
Compare
255f843
to
6070d07
Compare
@Quantumplation @cardenaso11 Tested backward compatibility of Using current master stores items in
Starting this version of the node then converts items to the new format:
So this means, the current implementation does not override old entries in the persistence. If we want this, the current implementation of |
…entSink.\n\nIt's not clear yet if we can have the statechange ID in the StateChanged type as a substitute for tracking the current stateChangeID in the node\nBut for now it works to get the node's stateChangedID into the disk-persistence eventSink's putEvent', where we can compare it against the last persisted stateChange\nThis works for the particular disk-based eventSink where we *do have* exactly-once, but in ex kinesis poc or redis or something, it can be the key for at-least-once dict lint
We can define an event source and event sink from the PersistenceIncremental.
Having a dedicated module allows easier import in forks of hydra and hopefully serves as a better extension points (with less breakage).
We kept the EventSource/EventSink very abstract to make implementations not realy on the internas of the actual data type used. However, an event source / sink will need at least identify individual events to tell them apart, e.g. to deduplicate them in memory.
Current idea: Keep notes where extensions are possible so we don't change things by accident breaking external/custom code of users.
Also restore usage of persistenceIncremental in Hydra.Network.Reliability We do not want to update this module (yet) with a different means for persistence.
By making the step-wise construction of HydraNode monadic (although not specifically needed) we can simplify call-sites in the NodeSpec. Also merge loadStateEventSource into hydrate function.
This is unfortunately very "white-boxy" and the testing code needs to know that the generated events must be using parameters consistent with the environment. The alternative would be to move checkHeadState outside of hydrate, but then 'WetHydraNode's can exist without this check happening.
This is generic dropping is needed on the Node-level (or InputQueue-level) to handle some cases that are generated in the new NodeSpec (which uses more property based testing now).
We do not want to change the HeadLogic signature nor add event ids to the StateChanged values. Instead, keeping track of the next event id should be orthogonal to the main protocol logic.
…urce/sink These ensure that the new EventSource/Eventsink interface is also backward compatible to directly using the PersistenceIncremental with StateChanged items.
This implementation and its tests will serve as an example for an EventSource and EventSink.
5779a5c
to
724f6e8
Compare
|
||
-- | Convert a 'DraftHydraNode' to a 'HydraNode' by providing mock implementations. | ||
notConnect :: MonadThrow m => DraftHydraNode SimpleTx m -> m (HydraNode SimpleTx m) | ||
notConnect = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe mockConnect
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a good name. I do assume though that this is a mere "Could rename" suggestion?
events <- atomically . forM stateChanges $ \stateChanged -> do | ||
eventId <- getNextEventId | ||
pure StateEvent{eventId, stateChanged} | ||
putEventsToSinks eventSinks events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the only part that makes me noise.
do we really want to have the node crashing if a sink is not available?
cant we have a better strategy here?
maybe there is a plan for the future, can we elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been defined in the ADR
The stepHydraNode main loop does call putEvent on all eventSinks in sequence. Any failure will make the hydra-node process terminate and require a restart.
and does not present a difference to like it was before. If the persistence failed to write to disk, it would bring down the whole node.
do we really want to have the node crashing if a sink is not available?
What's the alternative? We can't continue if a sink failed to process an event.
Implements EventSource and EventSink abstractions as outlined in this ADR.
A node
hydrate
s from anEventSource
.Events are directed to any number of
EventSink
s, which is responsible for emitting them to external services, persisting them to disk, etc.This abstracts further the notion of persistence, and allows forks of the hydra node to implement their own integrations, such as writing to a kinesis stream, an S3 bucket, or even some kind of decentralized storage provider.
While there is currently no plan to make this directly pluggable within the Hydra node in the near future, it does pin down the API between the hydra node and it's forks, so that any ongoing changes to the hydra node which don't interfere with this interface have very little impact on the upstream fork and can be easily merged.