From b10d3fb96a736322e998603adc9887b373a48eeb Mon Sep 17 00:00:00 2001 From: Patryk Prus Date: Thu, 6 Mar 2025 20:12:57 -0500 Subject: [PATCH] Upgrade mimir-prometheus (#10824) * Upgrade mimir-prometheus * Update CHANGELOG --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- .../prometheus/prometheus/tsdb/head.go | 67 ++++++++++++------- .../prometheus/prometheus/tsdb/head_wal.go | 4 +- .../prometheus/tsdb/wlog/checkpoint.go | 6 +- vendor/modules.txt | 4 +- 7 files changed, 55 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0a69acab59..c3895a67b0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ * [BUGFIX] Distributor: Report partially converted OTLP requests with status 400 Bad Request. #10588 * [BUGFIX] Ruler: fix issue where rule evaluations could be missed while shutting down a ruler instance if that instance owns many rule groups. prometheus/prometheus#15804 #10762 * [BUGFIX] Ingester: Add additional check on reactive limiter queue sizes. #10722 +* [BUGFIX] TSDB: fix unknown series errors and possible lost data during WAL replay when series are removed from the head due to inactivity and reappear before the next WAL checkpoint. https://github.com/prometheus/prometheus/pull/16060 #10824 ### Mixin diff --git a/go.mod b/go.mod index 06cf83915ec..57470ac4711 100644 --- a/go.mod +++ b/go.mod @@ -298,7 +298,7 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250305224633-8c45fc54920d +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250306234455-f6f6f2cceada // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet: diff --git a/go.sum b/go.sum index 3608c1f35c7..a0b20952eaf 100644 --- a/go.sum +++ b/go.sum @@ -1284,8 +1284,8 @@ github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1 h1:vR5nELq+KtGO github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20250305224633-8c45fc54920d h1:ff6cIM9Z2ew3nbXVjwEttZFdIEAe6X1lMfMc+xCIjKM= -github.com/grafana/mimir-prometheus v0.0.0-20250305224633-8c45fc54920d/go.mod h1:jC5V3PuoN3nxpvsvZipB+iOf6H/Np1uW+e3r9TTxJMA= +github.com/grafana/mimir-prometheus v0.0.0-20250306234455-f6f6f2cceada h1:8MLoP1fblwE72Bk4G66nmhXwoHDcpHQcfjrC+kLoXAg= +github.com/grafana/mimir-prometheus v0.0.0-20250306234455-f6f6f2cceada/go.mod h1:jC5V3PuoN3nxpvsvZipB+iOf6H/Np1uW+e3r9TTxJMA= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20250211112812-e32be5e2a455 h1:yidC1xzk4fedLZ/iXEqSJopkw3jPZPwoMqqzue4eFEA= diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head.go b/vendor/github.com/prometheus/prometheus/tsdb/head.go index 8db86e78a0d..0fb7a1605a9 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head.go @@ -96,8 +96,8 @@ type Head struct { // All series addressable by their ID or hash. series *stripeSeries - deletedMtx sync.Mutex - deleted map[chunks.HeadSeriesRef]int // Deleted series, and what WAL segment they must be kept until. + walExpiriesMtx sync.Mutex + walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. postings *index.MemPostings // Postings lists for terms. @@ -362,7 +362,7 @@ func (h *Head) resetInMemoryState() error { h.exemplars = es h.postings = index.NewUnorderedMemPostings() h.tombstones = tombstones.NewMemTombstones() - h.deleted = map[chunks.HeadSeriesRef]int{} + h.walExpiries = map[chunks.HeadSeriesRef]int{} h.chunkRange.Store(h.opts.ChunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) @@ -785,7 +785,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil { + if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { return fmt.Errorf("backfill checkpoint: %w", err) } h.updateWALReplayStatusRead(startFrom) @@ -818,7 +818,7 @@ func (h *Head) Init(minValidTime int64) error { if err != nil { return fmt.Errorf("segment reader (offset=%d): %w", offset, err) } - err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks) + err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) if err := sr.Close(); err != nil { h.logger.Warn("Error while closing the wal segments reader", "err", err) } @@ -1285,6 +1285,34 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) return false, false, 0 } +func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() + + keepUntil, ok := h.walExpiries[id] + return keepUntil, ok +} + +func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { + h.walExpiriesMtx.Lock() + defer h.walExpiriesMtx.Unlock() + + h.walExpiries[id] = keepUntil +} + +// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint +// last is the last WAL segment that was considered for checkpointing. +func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { + // Keep the record if the series exists in the head. + if h.series.getByID(id) != nil { + return true + } + + // Keep the record if the series has an expiry set. + keepUntil, ok := h.getWALExpiry(id) + return ok && keepUntil > last +} + // truncateWAL removes old data before mint from the WAL. func (h *Head) truncateWAL(mint int64) error { h.chunkSnapshotMtx.Lock() @@ -1318,17 +1346,8 @@ func (h *Head) truncateWAL(mint int64) error { return nil } - keep := func(id chunks.HeadSeriesRef) bool { - if h.series.getByID(id) != nil { - return true - } - h.deletedMtx.Lock() - keepUntil, ok := h.deleted[id] - h.deletedMtx.Unlock() - return ok && keepUntil > last - } h.metrics.checkpointCreationTotal.Inc() - if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { + if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil { h.metrics.checkpointCreationFail.Inc() var cerr *chunks.CorruptionErr if errors.As(err, &cerr) { @@ -1343,15 +1362,15 @@ func (h *Head) truncateWAL(mint int64) error { h.logger.Error("truncating segments failed", "err", err) } - // The checkpoint is written and segments before it is truncated, so we no - // longer need to track deleted series that are before it. - h.deletedMtx.Lock() - for ref, segment := range h.deleted { + // The checkpoint is written and segments before it is truncated, so stop + // tracking expired series. + h.walExpiriesMtx.Lock() + for ref, segment := range h.walExpiries { if segment <= last { - delete(h.deleted, ref) + delete(h.walExpiries, ref) } } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil { @@ -1618,7 +1637,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { if h.wal != nil { _, last, _ := wlog.Segments(h.wal.Dir()) - h.deletedMtx.Lock() + h.walExpiriesMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with // this ref ID. If we didn't keep these series records then @@ -1626,9 +1645,9 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // that reads the WAL, wouldn't be able to use those // samples since we would have no labels for that ref ID. for ref := range deleted { - h.deleted[chunks.HeadSeriesRef(ref)] = last + h.walExpiries[chunks.HeadSeriesRef(ref)] = last } - h.deletedMtx.Unlock() + h.walExpiriesMtx.Unlock() } return actualInOrderMint, minOOOTime, minMmapFile diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go b/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go index 0cc56256d49..18260d97a78 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_wal.go @@ -52,7 +52,7 @@ type histogramRecord struct { fh *histogram.FloatHistogram } -func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { +func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs atomic.Uint64 @@ -247,6 +247,8 @@ Outer: } if !created { multiRef[walSeries.Ref] = mSeries.ref + // Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints. + h.setWALExpiry(walSeries.Ref, lastSegment) } idx := uint64(mSeries.ref) % uint64(concurrency) diff --git a/vendor/github.com/prometheus/prometheus/tsdb/wlog/checkpoint.go b/vendor/github.com/prometheus/prometheus/tsdb/wlog/checkpoint.go index 5c607d70302..2c1b0c0534d 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/wlog/checkpoint.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/wlog/checkpoint.go @@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser @@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Drop irrelevant series in place. repl := series[:0] for _, s := range series { - if keep(s.Ref) { + if keep(s.Ref, to) { repl = append(repl, s) } } @@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He // Only keep reference to the latest found metadata for each refID. repl := 0 for _, m := range metadata { - if keep(m.Ref) { + if keep(m.Ref, to) { if _, ok := latestMetadataMap[m.Ref]; !ok { repl++ } diff --git a/vendor/modules.txt b/vendor/modules.txt index e337cb45f4f..74273bc5b34 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1076,7 +1076,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20250305224633-8c45fc54920d +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20250306234455-f6f6f2cceada ## explicit; go 1.22.7 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1761,7 +1761,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250305224633-8c45fc54920d +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250306234455-f6f6f2cceada # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094 # github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240531075221-3685f1377d7b