Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSE implementation that sheds stuck clients #14413

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from

Conversation

kasey
Copy link
Contributor

@kasey kasey commented Sep 3, 2024

What type of PR is this?

Bug fix

What does this PR do? Why is it needed?

This PR separates event stream api processing across 2 staged queues. First is a channel where event subscriptions are buffered, and second is an "outbox" with a capped size. The send to the outbox is protected by a select statement, so if the outbox can't be written, the event is dropped on the floor and a cleanup sequence is triggered, ending the event stream. Validation and filtering happens before the write to the outbox, but serialization is deferred via a closure until the event is ready to be written to the client.

A separate goroutine processes the outbox, draining the queue and calling flush on the response writer only once all events have been written to the client, removing the need for unnecessary flushing and regaining the benefits of connection buffering. If the keep-alive timer has fired, a keep-alive message is sent before flushing, but only if no events have been written. Rather than using a ticker, the keep-alive is tracked by a timer which is reset after the client flush completes.

Runtime errors like the wrong type being on a particular feed are logged as server-side errors and no longer pushed to the client as messages.

The unit test now uses what appears to be the popular golang the sse event client library to test that expected events are received.

This is still WIP; TODOs (before PR is merged):

  • fix other unit tests (only operations feed test fixed so far) and check if we want to add more coverage
  • rework the test response writer to internally use io.Pipe to more accurately mirror the way the sse library will scan the byte stream. Otherwise the test will be flaky due to the scanner hitting the end of an internal byte buffer and caching an io.EOF error.
  • self-review and cleanup
  • update CHANGELOG
  • do some testing by hand - maybe add a prysmctl command to stream remote events to stdout

Which issues(s) does this PR fix?

Slow readers of the event stream api can cause issues in the node when the queue backs up. The previous algorithm was for a select loop to read from event channels and write each event to the http ResponseWriter followed by a flush. A separate ticker would trigger http keep-alive without knowledge of what other writes or flushes were ongoing. This could result in unwanted backpressure on the event queue.

The stream would subscribe to both state feed and operation event channels, regardless of whether topics for those channels had been requested. It would perform a lot of duplicate error checking for conditions that would indicate bugs in the node code itself and treat them as runtime errors to be pushed to the client.

Fixes #

Other notes for review

Acknowledgements

  • I have read CONTRIBUTING.md.
  • I have made an appropriate entry to CHANGELOG.md.
  • I have added a description to this PR with sufficient context for reviewers to understand this PR.

@kasey kasey requested a review from a team as a code owner September 3, 2024 13:39
@@ -26,15 +26,25 @@ import (
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
sse "github.com/r3labs/sse/v2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're just using this for tests, is it worth using for the event itself? i avoided adding this in the initial implementation to avoid more dependencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the benefit for testing is pretty clear as it is testing conformance with the way clients read values. LMK if there's a particular piece of the lib that you think would improve the server side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to double check myself, i was talking to someone on this library and they mentioned that it did a few too many copies to be performant for his usecase, I don't know if that should apply to our usecase.

return
}
case <-ctx.Done():
return
if tp == PayloadAttributesTopic || (tp == HeadTopic && requestedTopics[PayloadAttributesTopic]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this part of the lazy reader? will we have to consider any other cases for this down the road?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there will be other instances where we need to tack on payload attributes. It is an odd special case in the first place, in that it is handled in the stream event and not generalized into the state feed.

In most cases of the lazy reader we defer the serialization, but in this case there's a bunch of work we want to do up front - because we want to grab the head right when we get the event - and this has a bunch of error cases to handle. I shunted it here in a hurry because it was the odd case that broke the pattern of how the other lazy readers work as I refactored lazyReaderForEvent, but I do think it could make sense to move it to the cases for these topics under lazyReaderForEvent. I'll try that out and see how it looks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

go func() {
var err error
kaT := time.NewTimer(es.kaDur)
defer func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can merge defers here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a reason for your suggestion?

The code would be functionally the same since defers execute sequentially (LIFO), the reason I wrote it this way is that I like to immediately follow the setup of a thing in need of cleanup with the defer to do so, ie: make a timer, set up the timer's cleanup.

case <-kaT.C:
err = es.writeOutbox(nil)
if err != nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we aren't going to do anything with the errs here should we have a debug log at least behind a log level check? or is it not worth it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed we should log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops I forgot that it is actually being logged in Cleanup - that's why var err error is predeclared above and passed to Cleanup. I'll add a comment to make that obvious. It's also currently logged as Error, but that should get bumped down to Debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated comments

},
SignatureSlot: fmt.Sprintf("%d", updateData.Data.SignatureSlot),
},
func (es *eventStreamer) writeOutbox(first lazyReader) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need to reread, but can you explain why it's named first here? and how this differs from the rf on the outbox

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first as in the first event to be written from the outbox to the client. The reason that we loop over the outbox within writeOutbox until it empties is that, in the time between writing the first event (or any subsequent event in the loop), additional events may have been enqueued while we are blocked waiting for the client to read. The goal here is to avoid unnecessary flushes or keep-alives, and the way we do that is process all the events as buffered writes, only issuing one flush at the end of the complete batch. Blocking in this method prevents the calling loop from running, preventing unnecessary keep-alive messages and flushes.

return send(w, flusher, LightClientFinalityUpdateTopic, update)
case statefeed.LightClientOptimisticUpdate:
if _, ok := requestedTopics[LightClientOptimisticUpdateTopic]; !ok {
written += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we do with this written counter? is it just for the written == check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be more clear to just keep a boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a thought to write a debug log in addition to treating it essentially as a boolean but yeah I'll just switch to boolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

StateRoot: hexutil.Encode(event.Data.AttestedHeader.StateRoot),
BodyRoot: hexutil.Encode(event.Data.AttestedHeader.BodyRoot),
},
FinalizedHeader: &BeaconBlockHeader{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(you missed BodyRoot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually missing in develop in the code I moved this construction from, but good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return topics
}

func validateTopics(topics []string) (bool, bool, map[string]bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use named return values to indicate what the two booleans represent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not and rely on the calling code for context. Returning multiple booleans is a bit ugly though, let me see if there's a nicer way to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Comment on lines 105 to 137
if topicsForStateFeed[topic] {
subState = true
requested[topic] = true
continue
}
if topicsForOpsFeed[topic] {
subOps = true
requested[topic] = true
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if topicsForStateFeed[topic] {
subState = true
requested[topic] = true
continue
}
if topicsForOpsFeed[topic] {
subOps = true
requested[topic] = true
continue
}
if topicsForStateFeed[topic] {
subState = true
}
if topicsForOpsFeed[topic] {
subOps = true
}
requested[topic] = true
continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

beacon-chain/rpc/eth/events/events.go Outdated Show resolved Hide resolved
beacon-chain/rpc/eth/events/events.go Outdated Show resolved Hide resolved
beacon-chain/rpc/eth/events/events.go Outdated Show resolved Hide resolved
}
return nil
f, ok := w.(StreamingResponseWriter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just change the parameter type to StreamingResponseWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a runtime panic, whereas this way results in a runtime error that is safe to handle. This is sort of an odd case because we don't know what the http handler will give us until run time. Can you think of a way to force this to be a compile-time error?

if err != nil {
return
}
// The timer has already fired here, so a call to Reset is safe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is in the wrong place, we call Reset in the next case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this comment is that < go 1.23 the Reset docs used to warn:
For a Timer created with NewTimer, Reset should be invoked only on stopped or expired timers with drained channels.

So I meant to convey that when Reset is called below, it's ok that the code in this case doesn't call Stop first, because we know the timer has fired. That's why in the other case (read from outbox) you see this comment:

We don't know if the timer fired concurrently to this case being ready, so we need to check the return of Stop and drain the timer channel if it fired.

I'm rewriting the comment to make it more clear.

Looks like in go 1.23 they cleaned all this up (and timers without any references can be garbage collected, even if they the code forgot to call Stop, yay!

return send(w, flusher, LightClientFinalityUpdateTopic, update)
case statefeed.LightClientOptimisticUpdate:
if _, ok := requestedTopics[LightClientOptimisticUpdateTopic]; !ok {
written += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be more clear to just keep a boolean

}
for {
select {
case rf := <-es.outbox:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused at how things are read from the outbox. You read from it in spawnWriteLoop in a for+switch (case lr := <-es.outbox) and then again here in another for+switch. Since this function is called from spawnWriteLoop, we are in a nested for loop where both the inner and outer loops read from the outbox (not concurrently, but still it's hard for me to wrap my head around what's going on).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from my other comment in response to James asking a related question:

The reason that we loop over the outbox within writeOutbox until it empties is that, in the time between writing the first event (or any subsequent event in the loop), additional events may have been enqueued while we are blocked waiting for the client to read. The goal here is to avoid unnecessary flushes or keep-alives, and the way we do that is process all the events as buffered writes, only issuing one flush at the end of the complete batch. Blocking in this method prevents the calling loop from running, preventing unnecessary keep-alive messages and flushes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it took a few times rereading to understand this

@kasey kasey force-pushed the async-event-streamer branch 2 times, most recently from 0e2f0b1 to c2ae175 Compare September 9, 2024 20:03
@kasey kasey changed the title WIP: sse implementation that sheds stuck clients SSE implementation that sheds stuck clients Sep 9, 2024
}
// If the client can't keep up, the outbox will eventually completely fill, at which
// safeWrite will error, and we'll hit the below return statement, at which point the deferred
// Unsuscribe calls will be made and the event feed will stop writing to this channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Unsuscribe calls will be made and the event feed will stop writing to this channel.
// unsuscribe calls will be made and the event feed will stop writing to this channel.

// Unsuscribe calls will be made and the event feed will stop writing to this channel.
// Since the outbox and event stream channels are separately buffered, the event subscription
// channel should stay relatively empty, which gives this loop time to unsubscribe
// and cleanup before the event stream channel fills and disrupts other readers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// and cleanup before the event stream channel fills and disrupts other readers.
// and clean up before the event stream channel fills and disrupts other readers.

httputil.HandleError(w, msg, http.StatusInternalServerError)
return
}
es, err := NewEventStreamer(eventFeedDepth, s.KeepAliveInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice looks pretty clean

func (es *eventStreamer) StreamEvents(ctx context.Context, w StreamingResponseWriter, req *topicRequest, s *Server) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go es.recvEventLoop(ctx, cancel, req, s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was way easier to read 👍

@@ -189,15 +198,7 @@ type eventStreamer struct {
keepAlive time.Duration
}

func (es *eventStreamer) streamEvents(ctx context.Context, w StreamingResponseWriter, req *topicRequest, s *Server) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a little easier to read originally, I see that the recv Event loop is swapped with the outboxWriteLoop as well.

seems like this wrapper would be a nice to have IMO


ctx, cancel := context.WithCancel(ctx)
defer cancel()
api.SetSSEHeaders(w)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it probably doesn't make a difference but maybe using sw will look more consistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants