Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r332] Use mimir-prometheus with index planning #10827

Draft
wants to merge 10 commits into
base: r332
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/component v0.116.0 // indirect
Expand Down Expand Up @@ -298,7 +299,7 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250302213708-bd234c29eed4
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20250307115605-76de169c0ae4

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,8 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U=
github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -1284,8 +1286,8 @@ github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1 h1:vR5nELq+KtGO
github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20250302213708-bd234c29eed4 h1:z3cxARHOrF+l39pYXGhQ5ykMy35VP6xpavZHSeCv6Bw=
github.com/grafana/mimir-prometheus v0.0.0-20250302213708-bd234c29eed4/go.mod h1:TRDP3hIlMiItiCmzGthWfWxgsltR8keKOQW0MmUfkKk=
github.com/grafana/mimir-prometheus v0.0.0-20250307115605-76de169c0ae4 h1:J7lenrAXBCAPDLJqetZikQOrngJYHrwNWWbDfIUjbaM=
github.com/grafana/mimir-prometheus v0.0.0-20250307115605-76de169c0ae4/go.mod h1:GvDB7ooVHnRILyNkG4GlUWMAIjrEiNNkWB0K2WgUqr4=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20250211112812-e32be5e2a455 h1:yidC1xzk4fedLZ/iXEqSJopkw3jPZPwoMqqzue4eFEA=
Expand Down Expand Up @@ -1761,6 +1763,8 @@ github.com/twmb/franz-go/plugin/kotel v1.5.0 h1:TiPfGUbQK384OO7ZYGdo7JuPCbJn+/8n
github.com/twmb/franz-go/plugin/kotel v1.5.0/go.mod h1:wRXzRo76x1myOUMaVHAyraXoGBdEcvlLChGTVv5+DWU=
github.com/twmb/franz-go/plugin/kprom v1.1.0 h1:grGeIJbm4llUBF8jkDjTb/b8rKllWSXjMwIqeCCcNYQ=
github.com/twmb/franz-go/plugin/kprom v1.1.0/go.mod h1:cTDrPMSkyrO99LyGx3AtiwF9W6+THHjZrkDE2+TEBIU=
github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43 h1:QEePdg0ty2r0t1+qwfZmQ4OOl/MB2UXIeJSpIZv56lg=
github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43/go.mod h1:OYRfF6eb5wY9VRFkXJH8FFBi3plw2v+giaIu7P054pM=
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
Expand Down
18 changes: 17 additions & 1 deletion integration/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package integration
import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestCompactBlocksContainingNativeHistograms(t *testing.T) {
chkReader, err := chunks.NewDirReader(filepath.Join(outDir, blockID, block.ChunksDirname), nil)
require.NoError(t, err)

ixReader, err := index.NewFileReader(filepath.Join(outDir, blockID, block.IndexFilename), index.DecodePostingsRaw)
ixReader, err := index.NewFileReader(filepath.Join(outDir, blockID, block.IndexFilename), index.DecodePostingsRaw, emptyStats{})
require.NoError(t, err)

n, v := index.AllPostingsKey()
Expand Down Expand Up @@ -190,6 +191,21 @@ func TestCompactBlocksContainingNativeHistograms(t *testing.T) {
require.Equal(t, expectedSeries, compactedSeries)
}

type emptyStats struct {
}

func (e emptyStats) TotalSeries() int64 {
return 0
}

func (e emptyStats) LabelValuesCount(context.Context, string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func (e emptyStats) LabelValuesCardinality(context.Context, string, ...string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func isMarkedForDeletionDueToCompaction(t *testing.T, blockPath string) bool {
deletionMarkFilePath := filepath.Join(blockPath, block.DeletionMarkFilename)
b, err := os.ReadFile(deletionMarkFilePath)
Expand Down
14 changes: 12 additions & 2 deletions pkg/compactor/split_merge_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,16 +784,26 @@ func TestMultitenantCompactor_ShouldGuaranteeSeriesShardingConsistencyOverTheTim
require.NoError(t, err)

// Find all series in the block.
postings, err := indexReader.PostingsForMatchers(ctx, false, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
postings, pendingMatchers, err := indexReader.PostingsForMatchers(ctx, false, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
require.NoError(t, err)

builder := labels.NewScratchBuilder(1)
for postings.Next() {
// Symbolize the series labels.
require.NoError(t, indexReader.Series(postings.At(), &builder, nil))
seriesLabels := builder.Labels()
allMatch := true
for _, m := range pendingMatchers {
if !m.Matches(seriesLabels.Get(m.Name)) {
allMatch = false
break
}
}
if !allMatch {
continue
}

// Ensure the series below to the right shard.
seriesLabels := builder.Labels()
seriesID, err := strconv.Atoi(seriesLabels.Get("series_id"))
require.NoError(t, err)
assert.Contains(t, expectedSeriesIDs, seriesID, "series:", seriesLabels.String())
Expand Down
36 changes: 26 additions & 10 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
}

isNativeHistogram := request.GetType() == client.NATIVE_HISTOGRAM_SERIES
postings, err := getPostings(ctx, db, idx, matchers, isNativeHistogram)
postings, pendingMatchers, err := getPostings(ctx, db, idx, matchers, isNativeHistogram)
if err != nil {
return fmt.Errorf("error listing active series: %w", err)
}
Expand All @@ -79,7 +79,19 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
}
return fmt.Errorf("error getting series: %w", err)
}
m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(buf.Labels())}
lbsl := buf.Labels()
allMatch := true
for _, m := range pendingMatchers {
if !m.Matches(lbsl.Get(m.Name)) {
allMatch = false
break
}
}
if !allMatch {
continue
}

m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(lbsl)}
mSize := m.Size()
if isNativeHistogram {
mSize += 8 // 8 bytes for the bucket count.
Expand Down Expand Up @@ -110,30 +122,30 @@ func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream clie
return nil
}

func getPostings(ctx context.Context, db *userTSDB, idx tsdb.IndexReader, matchers []*labels.Matcher, isNativeHistogram bool) (activeseries.BucketCountPostings, error) {
func getPostings(ctx context.Context, db *userTSDB, idx tsdb.IndexReader, matchers []*labels.Matcher, isNativeHistogram bool) (activeseries.BucketCountPostings, []*labels.Matcher, error) {
if db.activeSeries == nil {
return nil, fmt.Errorf("active series tracker is not initialized")
return nil, nil, fmt.Errorf("active series tracker is not initialized")
}

shard, matchers, err := sharding.RemoveShardFromMatchers(matchers)
if err != nil {
return nil, fmt.Errorf("error removing shard matcher: %w", err)
return nil, nil, fmt.Errorf("error removing shard matcher: %w", err)
}

postings, err := tsdb.PostingsForMatchers(ctx, idx, matchers...)
postings, pendingMatchers, err := tsdb.PostingsForMatchers(ctx, idx, matchers...)
if err != nil {
return nil, fmt.Errorf("error getting postings: %w", err)
return nil, nil, fmt.Errorf("error getting postings: %w", err)
}

if shard != nil {
postings = idx.ShardedPostings(postings, shard.ShardIndex, shard.ShardCount)
}

if isNativeHistogram {
return activeseries.NewNativeHistogramPostings(db.activeSeries, postings), nil
return activeseries.NewNativeHistogramPostings(db.activeSeries, postings), pendingMatchers, nil
}

return &ZeroBucketCountPostings{*activeseries.NewPostings(db.activeSeries, postings)}, nil
return &ZeroBucketCountPostings{*activeseries.NewPostings(db.activeSeries, postings)}, pendingMatchers, nil
}

// listActiveSeries is used for testing purposes, builds the whole array of active series in memory.
Expand All @@ -142,10 +154,14 @@ func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matc
if err != nil {
return nil, fmt.Errorf("error getting index: %w", err)
}
postings, err := getPostings(ctx, db, idx, matchers, false)
ctx = context.WithValue(ctx, "disable_optimized_index_lookup", true)
postings, pendingMatchers, err := getPostings(ctx, db, idx, matchers, false)
if err != nil {
return nil, err
}
if len(pendingMatchers) > 0 {
return nil, fmt.Errorf("pending matchers: %v", pendingMatchers)
}
return NewSeries(postings, idx), nil
}

Expand Down
13 changes: 11 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2138,13 +2138,22 @@ func (i *Ingester) LabelValuesCardinality(req *client.LabelValuesCardinalityRequ
var postingsForMatchersFn func(context.Context, tsdb.IndexPostingsReader, ...*labels.Matcher) (index.Postings, error)
switch req.GetCountMethod() {
case client.IN_MEMORY:
postingsForMatchersFn = tsdb.PostingsForMatchers
postingsForMatchersFn = func(ctx context.Context, reader tsdb.IndexPostingsReader, matcher ...*labels.Matcher) (index.Postings, error) {
postings, pendingMatchers, err := tsdb.PostingsForMatchers(context.WithValue(ctx, "disable_optimized_index_lookup", true), reader, matcher...)
if len(pendingMatchers) > 0 {
return nil, fmt.Errorf("unsupported pending matchers %v", pendingMatchers)
}
return postings, err
}
case client.ACTIVE:
postingsForMatchersFn = func(ctx context.Context, ix tsdb.IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
postings, err := tsdb.PostingsForMatchers(ctx, ix, ms...)
postings, pendingMatchers, err := tsdb.PostingsForMatchers(context.WithValue(ctx, "disable_optimized_index_lookup", true), ix, ms...)
if err != nil {
return nil, err
}
if len(pendingMatchers) > 0 {
return nil, fmt.Errorf("unsupported pending matchers %v", pendingMatchers)
}
return activeseries.NewPostings(db.activeSeries, postings), nil
}
default:
Expand Down
18 changes: 17 additions & 1 deletion pkg/storage/tsdb/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ func (n *minMaxSumInt64) Avg() int64 {
return n.sum / n.cnt
}

type emptyStats struct {
}

func (e emptyStats) TotalSeries() int64 {
return 0
}

func (e emptyStats) LabelValuesCount(context.Context, string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func (e emptyStats) LabelValuesCardinality(context.Context, string, ...string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

// GatherBlockHealthStats returns useful counters as well as outsider chunks (chunks outside of block time range) that
// helps to assess index and optionally chunk health.
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
Expand All @@ -218,7 +233,8 @@ func GatherBlockHealthStats(ctx context.Context, logger log.Logger, blockDir str
indexFn := filepath.Join(blockDir, IndexFilename)
chunkDir := filepath.Join(blockDir, ChunksDirname)
// index reader
r, err := index.NewFileReader(indexFn, index.DecodePostingsRaw)
// TODO dimitarvdimitrov do we need to fix this?
r, err := index.NewFileReader(indexFn, index.DecodePostingsRaw, emptyStats{})
if err != nil {
return stats, errors.Wrap(err, "open index file")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/tsdb/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestRewrite(t *testing.T) {
}, 150, 0, 1000, labels.EmptyLabels())
require.NoError(t, err)

ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename), index.DecodePostingsRaw)
ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename), index.DecodePostingsRaw, emptyStats{})
require.NoError(t, err)

defer func() { require.NoError(t, ir.Close()) }()
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestRewrite(t *testing.T) {
require.NoError(t, iw.Close())
require.NoError(t, cw.Close())

ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename), index.DecodePostingsRaw)
ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename), index.DecodePostingsRaw, emptyStats{})
require.NoError(t, err)

defer func() { require.NoError(t, ir2.Close()) }()
Expand Down
17 changes: 16 additions & 1 deletion pkg/storegateway/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,25 @@ func TestReadersComparedToIndexHeader(t *testing.T) {

}

type emptyStats struct {
}

func (e emptyStats) TotalSeries() int64 {
return 0
}

func (e emptyStats) LabelValuesCount(context.Context, string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func (e emptyStats) LabelValuesCardinality(context.Context, string, ...string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerReader Reader) {
ctx := context.Background()

indexReader, err := index.NewReader(indexByteSlice, index.DecodePostingsRaw)
indexReader, err := index.NewReader(indexByteSlice, index.DecodePostingsRaw, emptyStats{})
require.NoError(t, err)
defer func() { _ = indexReader.Close() }()

Expand Down
4 changes: 3 additions & 1 deletion pkg/storegateway/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ func queryPromSeriesChunkMetas(t testing.TB, series labels.Labels, block promtsd
series.Range(func(l labels.Label) {
matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value))
})
postings, err := promReader.PostingsForMatchers(ctx, false, matchers...)
ctx = context.WithValue(ctx, "disable_optimized_index_lookup", true)
postings, pendingMatchers, err := promReader.PostingsForMatchers(ctx, false, matchers...)
if err != nil {
require.NoError(t, err)
}
require.Empty(t, pendingMatchers)

if !postings.Next() {
require.Truef(t, false, "selecting from prometheus returned no series for %s", util.MatchersStringer(matchers))
Expand Down
18 changes: 17 additions & 1 deletion tools/splitblocks/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -200,6 +201,21 @@ func buildSeriesSpec(startOfDay time.Time) []*block.SeriesSpec {
}
}

type emptyStats struct {
}

func (e emptyStats) TotalSeries() int64 {
return 0
}

func (e emptyStats) LabelValuesCount(context.Context, string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func (e emptyStats) LabelValuesCardinality(context.Context, string, ...string) (int64, error) {
return 0, fmt.Errorf("not implemented")
}

func listSeriesAndChunksFromBlock(t *testing.T, blockDir string) []*block.SeriesSpec {
blk, err := tsdb.OpenBlock(promslog.NewNopLogger(), blockDir, nil, nil)
require.NoError(t, err)
Expand All @@ -208,7 +224,7 @@ func listSeriesAndChunksFromBlock(t *testing.T, blockDir string) []*block.Series
defer require.NoError(t, chunkReader.Close())

allKey, allValue := index.AllPostingsKey()
r, err := index.NewFileReader(filepath.Join(blockDir, block.IndexFilename), index.DecodePostingsRaw)
r, err := index.NewFileReader(filepath.Join(blockDir, block.IndexFilename), index.DecodePostingsRaw, emptyStats{})
require.NoError(t, err)
defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader")
it, err := r.Postings(context.Background(), allKey, allValue)
Expand Down
6 changes: 5 additions & 1 deletion tools/tsdb-gaps/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ func analyzeBlockForGaps(ctx context.Context, cfg config, blockDir string, match
}
defer idx.Close()

p, err := idx.PostingsForMatchers(ctx, true, matchers...)
p, pendingMatchers, err := idx.PostingsForMatchers(ctx, true, matchers...)
if err != nil {
return blockStats, err
}
if len(pendingMatchers) > 0 {
// TODO dimitarvdimitrov fix this
return blockStats, fmt.Errorf("didn't expect pending matchers: %v", pendingMatchers)
}

var builder labels.ScratchBuilder
for p.Next() {
Expand Down
Loading
Loading