Skip to content

Commit

Permalink
Fix data loss bug in the experimental ingest storage when a Kafka Fet…
Browse files Browse the repository at this point in the history
…ch is split into multiple requests and some of them return an error (#9963)

* WIP

Signed-off-by: Marco Pracucci <[email protected]>

* Reproduced the bug

Signed-off-by: Marco Pracucci <[email protected]>

* Added TestPartitionReader_ShouldNotMissRecordsIfFetchRequestContainPartialFailuresWithConcurrentFetcherIsUsed()

Signed-off-by: Marco Pracucci <[email protected]>

* Make it less verbose

Signed-off-by: Marco Pracucci <[email protected]>

* Added more TODOs

Signed-off-by: Marco Pracucci <[email protected]>

* Improvements

Signed-off-by: Marco Pracucci <[email protected]>

* Improved tests

Signed-off-by: Marco Pracucci <[email protected]>

* Improved error checking in parseFetchResponse()

Signed-off-by: Marco Pracucci <[email protected]>

* Split newEmptyFetchResult() into newEmptyFetchResult() and newErrorFetchResult()

Signed-off-by: Marco Pracucci <[email protected]>

* Unit tested fetchSingle()

Signed-off-by: Marco Pracucci <[email protected]>

* Make the new test much faster

Signed-off-by: Marco Pracucci <[email protected]>

* Fix linter

Signed-off-by: Marco Pracucci <[email protected]>

* Add CHANGELOG entry

Signed-off-by: Marco Pracucci <[email protected]>

* Updated comment

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Nov 20, 2024
1 parent f40dbab commit 96c92c9
Show file tree
Hide file tree
Showing 7 changed files with 609 additions and 71 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
* [BUGFIX] Ingester: Fix race condition in per-tenant TSDB creation. #9708
* [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765
* [BUGFIX] Ingester: Fix race condition in native histogram appending. #9765
* [BUGFIX] Ingester: fix bug in concurrent fetching where a failure to list topics on startup would cause to use an invalid topic ID (0x00000000000000000000000000000000). #9883
* [BUGFIX] Ingester: Fix bug in concurrent fetching where a failure to list topics on startup would cause to use an invalid topic ID (0x00000000000000000000000000000000). #9883
* [BUGFIX] Ingester: Fix data loss bug in the experimental ingest storage when a Kafka Fetch is split into multiple requests and some of them return an error. #9963
* [BUGFIX] PromQL: `round` now removes the metric name again. #9879

### Mixin
Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
)

Expand Down Expand Up @@ -39,6 +40,12 @@ var (
ErrInvalidIngestionConcurrencyParams = errors.New("ingest-storage.kafka.ingestion-concurrency-queue-capacity, ingest-storage.kafka.ingestion-concurrency-estimated-bytes-per-sample, ingest-storage.kafka.ingestion-concurrency-batch-size and ingest-storage.kafka.ingestion-concurrency-target-flushes-per-shard must be greater than 0")

consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp}

defaultFetchBackoffConfig = backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
MaxRetries: 0, // Retry forever. Do NOT change!
}
)

type Config struct {
Expand Down Expand Up @@ -125,13 +132,19 @@ type KafkaConfig struct {
// IngestionConcurrencyEstimatedBytesPerSample is the estimated number of bytes per sample.
// Our data indicates that the average sample size is somewhere between ~250 and ~500 bytes. We'll use 500 bytes as a conservative estimate.
IngestionConcurrencyEstimatedBytesPerSample int `yaml:"ingestion_concurrency_estimated_bytes_per_sample"`

// The fetch backoff config to use in the concurrent fetchers (when enabled). This setting
// is just used to change the default backoff in tests.
concurrentFetchersFetchBackoffConfig backoff.Config `yaml:"-"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.concurrentFetchersFetchBackoffConfig = defaultFetchBackoffConfig

f.StringVar(&cfg.Address, prefix+".address", "", "The Kafka backend address.")
f.StringVar(&cfg.Topic, prefix+".topic", "", "The Kafka topic name.")
f.StringVar(&cfg.ClientID, prefix+".client-id", "", "The Kafka client ID.")
Expand Down
Loading

0 comments on commit 96c92c9

Please sign in to comment.