Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#374 - allow concurrent consumption of events #29

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,76 @@ Getting started guide: https://watermill.io/docs/getting-started/

Issues: https://github.com/ThreeDotsLabs/watermill/issues

## Message consumption models

The library, complies with the Subscriber interface:

```go
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}
```

However, it implements several consumption models:
- one in-flight message (default)
- batch consumption
- partition concurrent

### One in-flight message (default)

This is the **default message consumption model**. In this model, when the subscription channel returns a message, it will not return another one until that message is ACKed or NACKed.

When a message is ACKed, the next one (if any), will be pushed to the channel and the partition offset will be updated if there is a consumer group session.

This mode has the advantage of being simple and easily ensuring ordering.

### Batch consumption

While the default model is simple to understand and safe, sometimes, a greater degree of parallelism is required. For example:
- if you use a partitioner based on the key of the messages, you can expect a partial order, that is, the messages overall are not sorted, but they are sorted within the same partition. In that case, you can potentially process multiple messages and the default can fall short
- for some reason you do not care about the order

In this model, the customer can configure a `maxBatchSize` and `maxWaitTime`. The subscriber will wait until there are `maxBatchSize` messages ready or `maxWaitTime` is ellapsed.

It will, then introduce those messages on the subscription channel. That means that a consumer can now get multiple messages without having to ACK / NACK the previously received ones.

This model deals with ACKs and NACKs properly by resetting the offset of the different (topics, partitions) tuples to the last
message ACKed before a NACK for that (topic, partition) arrived.

Some examples:
- all messages ACKed: offset of the latest message is marked as done
- first message ACKed and second NACKed: offset for the first message is marked as done and second message is resent

To configure it:

```go
kafka.SubscriberConfig{
// ... other settings here
ConsumerModel: kafka.Default,
BatchConsumerConfig: &kafka.BatchConsumerConfig{
MaxBatchSize: 10,
MaxWaitTime: 100 * time.Millisecond,
},
}

```

### Partition Concurrent

Partition concurrent works similar to the default consumption model. The main difference is that it allows up to N in-flight models, where N is the number of partitions.
This allows higher concurrency of processing while easily preserving order.

To configure it:

```go
kafka.SubscriberConfig{
// ... other settings here
ConsumerModel: kafka.PartitionConcurrent,
}

```

## Contributing

All contributions are very much welcome. If you'd like to help with Watermill development,
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go 1.17
require (
github.com/IBM/sarama v1.42.1
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.5.0
github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4
github.com/hashicorp/go-multierror v1.1.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
)

require (
github.com/Shopify/sarama v1.38.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand All @@ -33,6 +35,8 @@ require (
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.31.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
Expand Down
69 changes: 69 additions & 0 deletions go.sum

Large diffs are not rendered by default.

192 changes: 192 additions & 0 deletions pkg/kafka/batch_message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package kafka

import (
"context"
"fmt"
"time"

"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

// batchedMessageHandler works by fetching up to N messages from the provided channel
// waiting until maxWaitTime.
// It then takes the collected KafkaMessages and pushes them in order to outputChannel.
// When all have been ACKed or NACKed, it updates the offsets with the highest ACKed
// for each involved partition.
type batchedMessageHandler struct {
outputChannel chan<- *message.Message

maxBatchSize int16
maxWaitTime time.Duration

nackResendSleep time.Duration

logger watermill.LoggerAdapter
closing chan struct{}
messageParser messageParser
messages chan *messageHolder
}

func NewBatchedMessageHandler(
outputChannel chan<- *message.Message,
unmarshaler Unmarshaler,
logger watermill.LoggerAdapter,
closing chan struct{},
maxBatchSize int16,
maxWaitTime time.Duration,
nackResendSleep time.Duration,
) MessageHandler {
handler := &batchedMessageHandler{
outputChannel: outputChannel,
maxBatchSize: maxBatchSize,
maxWaitTime: maxWaitTime,
nackResendSleep: nackResendSleep,
closing: closing,
logger: logger,
messageParser: messageParser{
unmarshaler: unmarshaler,
},
messages: make(chan *messageHolder),
}
go handler.startProcessing()
return handler
}

func (h *batchedMessageHandler) startProcessing() {
buffer := make([]*messageHolder, 0, h.maxBatchSize)
mustSleep := h.nackResendSleep != NoSleep
logFields := watermill.LogFields{}
sendDeadline := time.Now().Add(h.maxWaitTime)
timer := time.NewTimer(h.maxWaitTime)
for {
timerExpired := false
select {
case message, ok := <-h.messages:
if !ok {
h.logger.Debug("Messages channel is closed", logFields)
}
buffer = append(buffer, message)
case <-timer.C:
if len(buffer) > 0 {
h.logger.Trace("Timer expired, sending already fetched messages.", logFields)
}
timerExpired = true
case <-h.closing:
h.logger.Debug("Subscriber is closing, stopping messageHandler", logFields)
return
}
size := len(buffer)
if timerExpired || size == int(h.maxBatchSize) {
if size > 0 {
newBuffer, err := h.processBatch(buffer)
if err != nil {
return
}
if newBuffer == nil {
return
}
buffer = newBuffer
// if there are messages in the buffer, it means there was NACKs, so we wait
if len(buffer) > 0 && mustSleep {
time.Sleep(h.nackResendSleep)
}
}
sendDeadline = time.Now().Add(h.maxWaitTime)
timerExpired = false
}
timer.Reset(time.Until(sendDeadline))
}
}

func (h *batchedMessageHandler) ProcessMessages(
ctx context.Context,
kafkaMessages <-chan *sarama.ConsumerMessage,
sess sarama.ConsumerGroupSession,
logFields watermill.LogFields,
) error {
for {
select {
case kafkaMsg := <-kafkaMessages:
if kafkaMsg == nil {
h.logger.Debug("kafkaMsg is closed, stopping ProcessMessages", logFields)
return nil
}
msg, err := h.messageParser.prepareAndProcessMessage(ctx, kafkaMsg, h.logger, logFields, sess)
if err != nil {
return err
}
h.messages <- msg
case <-h.closing:
h.logger.Debug("Subscriber is closing, stopping messageHandler", logFields)
return nil
case <-ctx.Done():
h.logger.Debug("Ctx was cancelled, stopping messageHandler", logFields)
return nil
}
}
}

func (h *batchedMessageHandler) processBatch(
buffer []*messageHolder,
) ([]*messageHolder, error) {
waitChannels := make([]<-chan bool, 0, len(buffer))
for _, msgHolder := range buffer {
ctx, cancelCtx := context.WithCancel(msgHolder.message.Context())
msgHolder.message.SetContext(ctx)
select {
case h.outputChannel <- msgHolder.message:
h.logger.Trace("Message sent to consumer", msgHolder.logFields)
waitChannels = append(waitChannels, waitForMessage(ctx, h.logger, msgHolder, cancelCtx))
case <-h.closing:
h.logger.Trace("Closing, message discarded", msgHolder.logFields)
defer cancelCtx()
return nil, nil
case <-ctx.Done():
h.logger.Trace("Closing, ctx cancelled before message was sent to consumer", msgHolder.logFields)
defer cancelCtx()
return nil, nil
}
}

// we wait for all the messages to be ACKed or NACKed
// and we store for each partition the last message that was ACKed so we
// can mark the latest complete offset
lastComittableMessages := make(map[string]*messageHolder, 0)
nackedPartitions := make(map[string]struct{})
newBuffer := make([]*messageHolder, 0, h.maxBatchSize)
for idx, waitChannel := range waitChannels {
msgHolder := buffer[idx]
h.logger.Trace("Waiting for message to be acked", msgHolder.logFields)
ack, ok := <-waitChannel
h.logger.Info("Received ACK / NACK response or closed", msgHolder.logFields)
// it was aborted
if !ok {
h.logger.Info("Returning as messages were closed", msgHolder.logFields)
return nil, nil
}
topicAndPartition := fmt.Sprintf("%s-%d", msgHolder.kafkaMessage.Topic, msgHolder.kafkaMessage.Partition)
_, partitionNacked := nackedPartitions[topicAndPartition]
if !ack || partitionNacked {
newBuffer = append(newBuffer, msgHolder.Copy())
nackedPartitions[topicAndPartition] = struct{}{}
continue
}
if !partitionNacked && ack {
lastComittableMessages[topicAndPartition] = msgHolder
}
}

// If a session is provided, we mark the latest committable message for
// each partition as done. This is required, because if we did not mark anything we might re-process
// messages unnecessarily. If we marked the latest in the bulk, we could lose NACKed messages.
for _, lastComittable := range lastComittableMessages {
if lastComittable.sess != nil {
h.logger.Trace("Marking offset as complete for", lastComittable.logFields)
lastComittable.sess.MarkMessage(lastComittable.kafkaMessage, "")
}
}

return newBuffer, nil
}
73 changes: 73 additions & 0 deletions pkg/kafka/batch_message_handler_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"testing"
"time"

"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func BenchmarkBatchMessageHandler(b *testing.B) {
b.Run("consumer group session is provided", func(b *testing.B) {
benchmarkBatchMessageHandler(b, true)
})

b.Run("no consumer group session", func(b *testing.B) {
benchmarkBatchMessageHandler(b, true)
})
}

func benchmarkBatchMessageHandler(b *testing.B, hasConsumerGroup bool) {
testConfig := testConfig{
batchWait: 100 * time.Millisecond,
maxBatchSize: 10,
hasConsumerGroup: hasConsumerGroup,
hasCountingConsumerGroup: false,
}

testBenchmark(b, testConfig, testBatchConsumption)
}

func testBenchmark(
b *testing.B,
testConfig testConfig,
buildHandler func(testConfig) (chan<- struct{}, chan *message.Message, MessageHandler),
) {
messagesInTest := 10_000
sess, _ := consumerGroupSession(testConfig)
kafkaMessages := make(chan *sarama.ConsumerMessage, messagesInTest)
defer close(kafkaMessages)
messagesToSend := make([]*sarama.ConsumerMessage, 0, messagesInTest)
for i := 0; i < messagesInTest; i++ {
msg := generateMessage("topic1", i%5, i/5)
messagesToSend = append(messagesToSend, msg)
kafkaMessages <- msg
}
closing, outputChannel, handler := buildHandler(
testConfig,
)
defer close(closing)
defer close(outputChannel)
go func() {
err := handler.ProcessMessages(context.Background(), kafkaMessages, sess, watermill.LogFields{})
assert.NoError(b, err)
}()
receivedMessages := make([]*message.Message, 0, messagesInTest)
for {
select {
case msg, ok := <-outputChannel:
require.True(b, ok, "channel closed earlier than expected")
receivedMessages = append(receivedMessages, msg)
msg.Ack()
}
if len(receivedMessages) == messagesInTest {
break
}
}
testSameMessagesAndLocalOrder(b, receivedMessages, messagesToSend)
}
Loading