Skip to content

Commit

Permalink
Ingest storage: configure BrokerMaxReadBytes on Kafka reader (#7148)
Browse files Browse the repository at this point in the history
* Ingest storage: configure BrokerMaxReadBytes on Kafka reader

Signed-off-by: Marco Pracucci <[email protected]>

* Update pkg/storage/ingest/reader.go

Co-authored-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
Co-authored-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
pracucci and dimitarvdimitrov authored Jan 17, 2024
1 parent ac3cb58 commit 5c37e4e
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,22 @@ func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) {
}

func (r *PartitionReader) newKafkaReader(at kgo.Offset) (*kgo.Client, error) {
const fetchMaxBytes = 100_000_000

opts := append(
commonKafkaClientOptions(r.kafkaCfg, r.metrics.kprom, r.logger),
kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{
r.kafkaCfg.Topic: {r.partitionID: at},
}),
kgo.FetchMinBytes(1),
kgo.FetchMaxBytes(100_000_000),
kgo.FetchMaxBytes(fetchMaxBytes),
kgo.FetchMaxWait(5*time.Second),
kgo.FetchMaxPartitionBytes(50_000_000),

// BrokerMaxReadBytes sets the maximum response size that can be read from
// Kafka. This is a safety measure to avoid OOMing on invalid responses.
// franz-go recommendation is to set it 2x FetchMaxBytes.
kgo.BrokerMaxReadBytes(2*fetchMaxBytes),
)
client, err := kgo.NewClient(opts...)
if err != nil {
Expand Down

0 comments on commit 5c37e4e

Please sign in to comment.