diff --git a/CHANGELOG.md b/CHANGELOG.md index e145e5f3959..9cab9ff378e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702 * [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738 +* [FEATURE] Distributor: Add experimental Influx handler. #10153 * [CHANGE] Querier: pass context to queryable `IsApplicable` hook. #10451 * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 * [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256 @@ -17,7 +18,7 @@ * [CHANGE] Ruler: Add `user` and `reason` labels to `cortex_ruler_write_requests_failed_total` and `cortex_ruler_queries_failed_total`; add `user` to `cortex_ruler_write_requests_total` and `cortex_ruler_queries_total` metrics. #10536 * [CHANGE] Querier / Query-frontend: Remove experimental `-querier.promql-experimental-functions-enabled` and `-query-frontend.block-promql-experimental-functions` CLI flags and respective YAML configuration options to enable experimental PromQL functions. Instead access to experimental PromQL functions is always blocked. You can enable them using the per-tenant setting `enabled_promql_experimental_functions`. #10660 #10712 -* [FEATURE] Distributor: Add experimental Influx handler. #10153 +* [CHANGE] Store-gateway: Include posting sampling rate in sparse index headers. When the sampling rate isn't set in a sparse index header, store gateway will rebuild the sparse header with the configured `blocks-storage.bucket-store.posting-offsets-in-mem-sampling` value. If the sparse header's sampling rate is set, but doesn't match the configured rate, store gateway will either rebuild the sparse header or downsample to the configured sampling rate. #10684 * [ENHANCEMENT] Compactor: Expose `cortex_bucket_index_last_successful_update_timestamp_seconds` for all tenants assigned to the compactor before starting the block cleanup job. #10569 * [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103 * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 @@ -65,6 +66,7 @@ * [ENHANCEMENT] All: Add `cortex_client_request_invalid_cluster_validation_labels_total` metrics, that is used by Mimir's gRPC clients to track invalid cluster validations. #10767 * [ENHANCEMENT] Ingester client: Add support to configure cluster validation for ingester clients. Failed cluster validations are tracked by `cortex_client_request_invalid_cluster_validation_labels_total` with label `client=ingester`. #10767 * [ENHANCEMENT] Add experimental metric `cortex_distributor_dropped_native_histograms_total` to measure native histograms silently dropped when native histograms are disabled for a tenant. #10760 +* [ENCHACEMENT] Compactor: Add experimental `-compactor.upload-sparse-index-headers` option. When enabled, the compactor will attempt to upload sparse index headers to object storage. This prevents latency spikes after adding store-gateway replicas. #10684 * [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185 * [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154 * [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index d561e439359..c79678c5e25 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -11629,6 +11629,17 @@ "fieldFlag": "compactor.max-lookback", "fieldType": "duration", "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "upload_sparse_index_headers", + "required": false, + "desc": "If enabled, the compactor constructs and uploads sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "compactor.upload-sparse-index-headers", + "fieldType": "boolean", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index ae3fc6f5e65..84993fe1f37 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1297,6 +1297,8 @@ Usage of ./cmd/mimir/mimir: Number of symbols flushers used when doing split compaction. (default 1) -compactor.tenant-cleanup-delay duration For tenants marked for deletion, this is the time between deletion of the last block, and doing final cleanup (marker files, debug files) of the tenant. (default 6h0m0s) + -compactor.upload-sparse-index-headers + [experimental] If enabled, the compactor constructs and uploads sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally. -config.expand-env Expands ${var} or $var in config according to the values of the environment variables. -config.file value diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 089aa3afb73..fecaa7a1f86 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -71,6 +71,8 @@ The following features are currently experimental: - `-compactor.in-memory-tenant-meta-cache-size` - Limit blocks processed in each compaction cycle. Blocks uploaded prior to the maximum lookback aren't processed. - `-compactor.max-lookback` + - Enable the compactor to upload sparse index headers to object storage during compaction cycles. + - `-compactor.upload-sparse-index-headers` - Ruler - Aligning of evaluation timestamp on interval (`align_evaluation_time_on_interval`) - Allow defining limits on the maximum number of rules allowed in a rule group by namespace and the maximum number of rule groups by namespace. If set, this supersedes the `-ruler.max-rules-per-rule-group` and `-ruler.max-rule-groups-per-tenant` limits. diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index bc9df10e4d8..2f5babf55fd 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -4976,6 +4976,13 @@ sharding_ring: # blocks are considered regardless of their upload time. # CLI flag: -compactor.max-lookback [max_lookback: | default = 0s] + +# (experimental) If enabled, the compactor constructs and uploads sparse index +# headers to object storage during each compaction cycle. This allows +# store-gateway instances to use the sparse headers from object storage instead +# of recreating them locally. +# CLI flag: -compactor.upload-sparse-index-headers +[upload_sparse_index_headers: | default = false] ``` ### store_gateway diff --git a/pkg/compactor/bucket_compactor.go b/pkg/compactor/bucket_compactor.go index 1978f8ea892..626cbb5c538 100644 --- a/pkg/compactor/bucket_compactor.go +++ b/pkg/compactor/bucket_compactor.go @@ -29,8 +29,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/providers/filesystem" "go.uber.org/atomic" + "github.com/grafana/mimir/pkg/storage/indexheader" "github.com/grafana/mimir/pkg/storage/sharding" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" @@ -394,11 +396,10 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul } blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger) + + // update labels and verify all blocks err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error { blockToUpload := blocksToUpload[idx] - - uploadedBlocks.Inc() - bdir := filepath.Join(subDir, blockToUpload.ulid.String()) // When splitting is enabled, we need to inject the shard ID as an external label. @@ -406,6 +407,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul if job.UseSplitting() { newLabels[mimir_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards())) } + blocksToUpload[idx].labels = newLabels newMeta, err := block.InjectThanosMeta(jobLogger, bdir, block.ThanosMeta{ Labels: newLabels, @@ -413,6 +415,7 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul Source: block.CompactorSource, SegmentFiles: block.GetSegmentFiles(bdir), }, nil) + if err != nil { return errors.Wrapf(err, "failed to finalize the block %s", bdir) } @@ -421,18 +424,47 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul return errors.Wrap(err, "remove tombstones") } - // Ensure the compacted block is valid. if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil { return errors.Wrapf(err, "invalid result block %s", bdir) } + return nil + }) + if err != nil { + return false, nil, err + } + + // Optionally build sparse-index-headers. Building sparse-index-headers is best effort, we do not skip uploading a + // compacted block if there's an error affecting sparse-index-headers. + if c.uploadSparseIndexHeaders { + // Create a bucket backed by the local compaction directory, allows calls to prepareSparseIndexHeader to + // construct sparse-index-headers without making requests to object storage. + fsbkt, err := filesystem.NewBucket(subDir) + if err != nil { + level.Warn(jobLogger).Log("msg", "failed to create filesystem bucket, skipping sparse header upload", "err", err) + return + } + _ = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error { + blockToUpload := blocksToUpload[idx] + err := prepareSparseIndexHeader(ctx, jobLogger, fsbkt, subDir, blockToUpload.ulid, c.sparseIndexHeaderSamplingRate, c.sparseIndexHeaderconfig) + if err != nil { + level.Warn(jobLogger).Log("msg", "failed to create sparse index headers", "block", blockToUpload.ulid.String(), "shard", blockToUpload.shardIndex, "err", err) + } + return nil + }) + } + // upload all blocks + err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error { + blockToUpload := blocksToUpload[idx] + uploadedBlocks.Inc() + bdir := filepath.Join(subDir, blockToUpload.ulid.String()) begin := time.Now() if err := block.Upload(ctx, jobLogger, c.bkt, bdir, nil); err != nil { return errors.Wrapf(err, "upload of %s failed", blockToUpload.ulid) } elapsed := time.Since(begin) - level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels)) + level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(blockToUpload.labels)) return nil }) if err != nil { @@ -457,10 +489,19 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket") } } - return true, compIDs, nil } +func prepareSparseIndexHeader(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string, id ulid.ULID, sampling int, cfg indexheader.Config) error { + // Calling NewStreamBinaryReader reads a block's index and writes a sparse-index-header to disk. + mets := indexheader.NewStreamBinaryReaderMetrics(nil) + br, err := indexheader.NewStreamBinaryReader(ctx, logger, bkt, dir, id, sampling, mets, cfg) + if err != nil { + return err + } + return br.Close() +} + // verifyCompactedBlocksTimeRanges does a full run over the compacted blocks // and verifies that they satisfy the min/maxTime from the source blocks func verifyCompactedBlocksTimeRanges(compIDs []ulid.ULID, sourceBlocksMinTime, sourceBlocksMaxTime int64, subDir string) error { @@ -530,6 +571,7 @@ func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob type ulidWithShardIndex struct { ulid ulid.ULID shardIndex int + labels map[string]string } // issue347Error is a type wrapper for errors that should invoke the repair process for broken block. @@ -747,20 +789,23 @@ var ownAllJobs = func(*Job) (bool, error) { // BucketCompactor compacts blocks in a bucket. type BucketCompactor struct { - logger log.Logger - sy *metaSyncer - grouper Grouper - comp Compactor - planner Planner - compactDir string - bkt objstore.Bucket - concurrency int - skipUnhealthyBlocks bool - ownJob ownCompactionJobFunc - sortJobs JobsOrderFunc - waitPeriod time.Duration - blockSyncConcurrency int - metrics *BucketCompactorMetrics + logger log.Logger + sy *metaSyncer + grouper Grouper + comp Compactor + planner Planner + compactDir string + bkt objstore.Bucket + concurrency int + skipUnhealthyBlocks bool + uploadSparseIndexHeaders bool + sparseIndexHeaderSamplingRate int + sparseIndexHeaderconfig indexheader.Config + ownJob ownCompactionJobFunc + sortJobs JobsOrderFunc + waitPeriod time.Duration + blockSyncConcurrency int + metrics *BucketCompactorMetrics } // NewBucketCompactor creates a new bucket compactor. @@ -779,25 +824,31 @@ func NewBucketCompactor( waitPeriod time.Duration, blockSyncConcurrency int, metrics *BucketCompactorMetrics, + uploadSparseIndexHeaders bool, + sparseIndexHeaderSamplingRate int, + sparseIndexHeaderconfig indexheader.Config, ) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) } return &BucketCompactor{ - logger: logger, - sy: sy, - grouper: grouper, - planner: planner, - comp: comp, - compactDir: compactDir, - bkt: bkt, - concurrency: concurrency, - skipUnhealthyBlocks: skipUnhealthyBlocks, - ownJob: ownJob, - sortJobs: sortJobs, - waitPeriod: waitPeriod, - blockSyncConcurrency: blockSyncConcurrency, - metrics: metrics, + logger: logger, + sy: sy, + grouper: grouper, + planner: planner, + comp: comp, + compactDir: compactDir, + bkt: bkt, + concurrency: concurrency, + skipUnhealthyBlocks: skipUnhealthyBlocks, + ownJob: ownJob, + sortJobs: sortJobs, + waitPeriod: waitPeriod, + blockSyncConcurrency: blockSyncConcurrency, + metrics: metrics, + uploadSparseIndexHeaders: uploadSparseIndexHeaders, + sparseIndexHeaderSamplingRate: sparseIndexHeaderSamplingRate, + sparseIndexHeaderconfig: sparseIndexHeaderconfig, }, nil } diff --git a/pkg/compactor/bucket_compactor_e2e_test.go b/pkg/compactor/bucket_compactor_e2e_test.go index 1acc4b7814f..3ff3984f7fa 100644 --- a/pkg/compactor/bucket_compactor_e2e_test.go +++ b/pkg/compactor/bucket_compactor_e2e_test.go @@ -37,6 +37,7 @@ import ( "github.com/thanos-io/objstore/providers/filesystem" "golang.org/x/sync/errgroup" + "github.com/grafana/mimir/pkg/storage/indexheader" "github.com/grafana/mimir/pkg/storage/tsdb/block" util_log "github.com/grafana/mimir/pkg/util/log" ) @@ -240,7 +241,10 @@ func TestGroupCompactE2E(t *testing.T) { planner := NewSplitAndMergePlanner([]int64{1000, 3000}) grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, logger) metrics := NewBucketCompactorMetrics(blocksMarkedForDeletion, prometheus.NewPedanticRegistry()) - bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics) + cfg := indexheader.Config{VerifyOnLoad: true} + bComp, err := NewBucketCompactor( + logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics, true, 32, cfg, + ) require.NoError(t, err) // Compaction on empty should not fail. @@ -374,6 +378,21 @@ func TestGroupCompactE2E(t *testing.T) { return nil })) + // expect the blocks that are compacted to have sparse-index-headers in object storage. + require.NoError(t, bkt.Iter(ctx, "", func(n string) error { + id, ok := block.IsBlockDir(n) + if !ok { + return nil + } + + if _, ok := others[id.String()]; ok { + p := path.Join(id.String(), block.SparseIndexHeaderFilename) + exists, _ := bkt.Exists(ctx, p) + assert.True(t, exists, "expected sparse index headers not found %s", p) + } + return nil + })) + for id, found := range nonCompactedExpected { assert.True(t, found, "not found expected block %s", id.String()) } diff --git a/pkg/compactor/bucket_compactor_test.go b/pkg/compactor/bucket_compactor_test.go index 7584db247ef..fe6b2d86213 100644 --- a/pkg/compactor/bucket_compactor_test.go +++ b/pkg/compactor/bucket_compactor_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/grafana/mimir/pkg/storage/indexheader" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/util/extprom" ) @@ -118,9 +119,10 @@ func TestBucketCompactor_FilterOwnJobs(t *testing.T) { } m := NewBucketCompactorMetrics(promauto.With(nil).NewCounter(prometheus.CounterOpts{}), nil) + cfg := indexheader.Config{VerifyOnLoad: true} for testName, testCase := range tests { t.Run(testName, func(t *testing.T) { - bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, testCase.ownJob, nil, 0, 4, m) + bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, testCase.ownJob, nil, 0, 4, m, false, 32, cfg) require.NoError(t, err) res, err := bc.filterOwnJobs(jobsFn()) @@ -155,8 +157,9 @@ func TestBlockMaxTimeDeltas(t *testing.T) { })) metrics := NewBucketCompactorMetrics(promauto.With(nil).NewCounter(prometheus.CounterOpts{}), nil) + cfg := indexheader.Config{VerifyOnLoad: true} now := time.UnixMilli(1500002900159) - bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, nil, nil, 0, 4, metrics) + bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, nil, nil, 0, 4, metrics, true, 32, cfg) require.NoError(t, err) deltas := bc.blockMaxTimeDeltas(now, []*Job{j1, j2}) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 83b751a6478..44d074e3e63 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -33,6 +33,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/indexheader" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/util" @@ -130,6 +131,11 @@ type Config struct { // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` + + // Allow compactor to upload sparse-index-header files + UploadSparseIndexHeaders bool `yaml:"upload_sparse_index_headers" category:"experimental"` + SparseIndexHeadersSamplingRate int `yaml:"-"` + SparseIndexHeadersConfig indexheader.Config `yaml:"-"` } // RegisterFlags registers the MultitenantCompactor flags. @@ -158,6 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is the time between deletion of the last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.NoBlocksFileCleanupEnabled, "compactor.no-blocks-file-cleanup-enabled", false, "If enabled, will delete the bucket-index, markers and debug files in the tenant bucket when there are no blocks left in the index.") f.DurationVar(&cfg.MaxLookback, "compactor.max-lookback", 0*time.Second, "Blocks uploaded before the lookback aren't considered in compactor cycles. If set, this value should be larger than all values in `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all blocks are considered regardless of their upload time.") + f.BoolVar(&cfg.UploadSparseIndexHeaders, "compactor.upload-sparse-index-headers", false, "If enabled, the compactor constructs and uploads sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.") // compactor concurrency options f.IntVar(&cfg.MaxOpeningBlocksConcurrency, "compactor.max-opening-blocks-concurrency", 1, "Number of goroutines opening blocks before compaction.") @@ -834,6 +841,9 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e c.compactorCfg.CompactionWaitPeriod, c.compactorCfg.BlockSyncConcurrency, c.bucketCompactorMetrics, + c.compactorCfg.UploadSparseIndexHeaders, + c.compactorCfg.SparseIndexHeadersSamplingRate, + c.compactorCfg.SparseIndexHeadersConfig, ) if err != nil { return errors.Wrap(err, "failed to create bucket compactor") diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 0459d10c5ca..e486226b1ff 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -1031,6 +1031,8 @@ func (t *Mimir) initAlertManager() (serv services.Service, err error) { func (t *Mimir) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.Common.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Compactor.SparseIndexHeadersConfig = t.Cfg.BlocksStorage.BucketStore.IndexHeader + t.Cfg.Compactor.SparseIndexHeadersSamplingRate = t.Cfg.BlocksStorage.BucketStore.PostingOffsetsInMemSampling t.Compactor, err = compactor.NewMultitenantCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, t.Registerer) if err != nil { diff --git a/pkg/storegateway/indexheader/binary_reader.go b/pkg/storage/indexheader/binary_reader.go similarity index 100% rename from pkg/storegateway/indexheader/binary_reader.go rename to pkg/storage/indexheader/binary_reader.go diff --git a/pkg/storegateway/indexheader/encoding/encoding.go b/pkg/storage/indexheader/encoding/encoding.go similarity index 100% rename from pkg/storegateway/indexheader/encoding/encoding.go rename to pkg/storage/indexheader/encoding/encoding.go diff --git a/pkg/storegateway/indexheader/encoding/encoding_test.go b/pkg/storage/indexheader/encoding/encoding_test.go similarity index 100% rename from pkg/storegateway/indexheader/encoding/encoding_test.go rename to pkg/storage/indexheader/encoding/encoding_test.go diff --git a/pkg/storegateway/indexheader/encoding/factory.go b/pkg/storage/indexheader/encoding/factory.go similarity index 100% rename from pkg/storegateway/indexheader/encoding/factory.go rename to pkg/storage/indexheader/encoding/factory.go diff --git a/pkg/storegateway/indexheader/encoding/factory_test.go b/pkg/storage/indexheader/encoding/factory_test.go similarity index 100% rename from pkg/storegateway/indexheader/encoding/factory_test.go rename to pkg/storage/indexheader/encoding/factory_test.go diff --git a/pkg/storegateway/indexheader/encoding/reader.go b/pkg/storage/indexheader/encoding/reader.go similarity index 100% rename from pkg/storegateway/indexheader/encoding/reader.go rename to pkg/storage/indexheader/encoding/reader.go diff --git a/pkg/storegateway/indexheader/encoding/reader_test.go b/pkg/storage/indexheader/encoding/reader_test.go similarity index 100% rename from pkg/storegateway/indexheader/encoding/reader_test.go rename to pkg/storage/indexheader/encoding/reader_test.go diff --git a/pkg/storegateway/indexheader/header.go b/pkg/storage/indexheader/header.go similarity index 98% rename from pkg/storegateway/indexheader/header.go rename to pkg/storage/indexheader/header.go index c018351d4d5..108311dc46e 100644 --- a/pkg/storegateway/indexheader/header.go +++ b/pkg/storage/indexheader/header.go @@ -13,7 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/index" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" ) const ( diff --git a/pkg/storegateway/indexheader/header_test.go b/pkg/storage/indexheader/header_test.go similarity index 71% rename from pkg/storegateway/indexheader/header_test.go rename to pkg/storage/indexheader/header_test.go index 87ecde23d23..bb12e36698a 100644 --- a/pkg/storegateway/indexheader/header_test.go +++ b/pkg/storage/indexheader/header_test.go @@ -129,6 +129,169 @@ func TestReadersComparedToIndexHeader(t *testing.T) { } +func Test_DownsampleSparseIndexHeader(t *testing.T) { + tests := map[string]struct { + protoRate int + inMemSamplingRate int + expected map[string]int + }{ + "downsample_1_to_32": { + protoRate: 1, + inMemSamplingRate: 32, + expected: map[string]int{ + "__name__": 4, + "": 1, + "__blockgen_target__": 4, + }, + }, + "downsample_4_to_16": { + protoRate: 4, + inMemSamplingRate: 16, + expected: map[string]int{ + "__name__": 7, + "": 1, + "__blockgen_target__": 7, + }, + }, + "downsample_8_to_24": { + protoRate: 8, + inMemSamplingRate: 24, + expected: map[string]int{ + "__name__": 5, + "": 1, + "__blockgen_target__": 5, + }, + }, + "downsample_17_to_51": { + protoRate: 17, + inMemSamplingRate: 51, + expected: map[string]int{ + "__name__": 3, + "": 1, + "__blockgen_target__": 3, + }, + }, + "noop_on_same_sampling_rate": { + protoRate: 32, + inMemSamplingRate: 32, + }, + "rebuild_proto_sampling_rate_not_divisible": { + protoRate: 8, + inMemSamplingRate: 20, + }, + "rebuild_cannot_upsample_from_proto_48_to_32": { + protoRate: 48, + inMemSamplingRate: 32, + }, + "rebuild_cannot_upsample_from_proto_64_to_32": { + protoRate: 64, + inMemSamplingRate: 32, + }, + "downsample_to_low_frequency": { + protoRate: 4, + inMemSamplingRate: 16384, + expected: map[string]int{ + "__name__": 2, + "": 1, + "__blockgen_target__": 2, + }, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + m, err := block.ReadMetaFromDir("./testdata/index_format_v2") + require.NoError(t, err) + + tmpDir := t.TempDir() + test.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String())) + + bkt, err := filesystem.NewBucket(tmpDir) + require.NoError(t, err) + + ctx := context.Background() + noopMetrics := NewStreamBinaryReaderMetrics(nil) + + // write a sparse index-header file to disk + br1, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, m.ULID, tt.protoRate, noopMetrics, Config{}) + require.NoError(t, err) + require.Equal(t, tt.protoRate, br1.postingsOffsetTable.PostingOffsetInMemSampling()) + + origLabelNames, err := br1.postingsOffsetTable.LabelNames() + require.NoError(t, err) + + // a second call to NewStreamBinaryReader loads the previously written sparse index-header and downsamples + // the header from tt.protoRate to tt.inMemSamplingRate entries for each posting + br2, err := NewStreamBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, m.ULID, tt.inMemSamplingRate, noopMetrics, Config{}) + require.NoError(t, err) + require.Equal(t, tt.inMemSamplingRate, br2.postingsOffsetTable.PostingOffsetInMemSampling()) + + downsampleLabelNames, err := br2.postingsOffsetTable.LabelNames() + require.NoError(t, err) + + // label names are equal between original and downsampled sparse index-headers + require.ElementsMatch(t, downsampleLabelNames, origLabelNames) + + origIdxpbTbl := br1.postingsOffsetTable.NewSparsePostingOffsetTable() + downsampleIdxpbTbl := br2.postingsOffsetTable.NewSparsePostingOffsetTable() + + for name, vals := range origIdxpbTbl.Postings { + downsampledOffsets := downsampleIdxpbTbl.Postings[name].Offsets + // downsampled postings are a subset of the original sparse index-header postings + if (tt.inMemSamplingRate > tt.protoRate) && (tt.inMemSamplingRate%tt.protoRate == 0) { + require.Equal(t, tt.expected[name], len(downsampledOffsets)) + require.Subset(t, vals.Offsets, downsampledOffsets, "downsampled offsets not a subset of original for name '%s'", name) + + require.Equal(t, downsampledOffsets[0], vals.Offsets[0], "downsampled offsets do not contain first value for name '%s'", name) + require.Equal(t, downsampledOffsets[len(downsampledOffsets)-1], vals.Offsets[len(vals.Offsets)-1], "downsampled offsets do not contain last value for name '%s'", name) + } + + // check first and last entry from the original postings in downsampled set + require.NotZero(t, downsampleIdxpbTbl.Postings[name].LastValOffset) + } + }) + } +} + +func compareIndexToHeaderPostings(t *testing.T, indexByteSlice index.ByteSlice, sbr *StreamBinaryReader) { + + ir, err := index.NewReader(indexByteSlice, index.DecodePostingsRaw) + require.NoError(t, err) + defer func() { + _ = ir.Close() + }() + + toc, err := index.NewTOCFromByteSlice(indexByteSlice) + require.NoError(t, err) + + tblOffsetBounds := make(map[string][2]int64) + + // Read the postings offset table and record first and last offset for each label. Adjust offsets in ReadPostingsOffsetTable + // by 4B (int32 count of postings in table) to align with postings in index headers. + err = index.ReadPostingsOffsetTable(indexByteSlice, toc.PostingsTable, func(label []byte, _ []byte, _ uint64, offset int) error { + name := string(label) + off := int64(offset + 4) + if v, ok := tblOffsetBounds[name]; ok { + v[1] = off + tblOffsetBounds[name] = v + } else { + tblOffsetBounds[name] = [2]int64{off, off} + } + return nil + }) + require.NoError(t, err) + + tbl := sbr.postingsOffsetTable.NewSparsePostingOffsetTable() + + expLabelNames, err := ir.LabelNames(context.Background()) + require.NoError(t, err) + for _, lname := range expLabelNames { + offsets := tbl.Postings[lname].Offsets + assert.Equal(t, offsets[0].TableOff, tblOffsetBounds[lname][0]) + assert.Equal(t, offsets[len(offsets)-1].TableOff, tblOffsetBounds[lname][1]) + } +} + func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerReader Reader) { ctx := context.Background() @@ -192,15 +355,22 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe for _, v := range valOffsets { ptr, err := headerReader.PostingsOffset(ctx, lname, v.LabelValue) require.NoError(t, err) - assert.Equal(t, expRanges[labels.Label{Name: lname, Value: v.LabelValue}], ptr) - assert.Equal(t, expRanges[labels.Label{Name: lname, Value: v.LabelValue}], v.Off) + label := labels.Label{Name: lname, Value: v.LabelValue} + assert.Equal(t, expRanges[label], ptr) + assert.Equal(t, expRanges[label], v.Off) + delete(expRanges, label) } } + allPName, allPValue := index.AllPostingsKey() ptr, err := headerReader.PostingsOffset(ctx, allPName, allPValue) require.NoError(t, err) - require.Equal(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start) - require.Equal(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End) + + emptyLabel := labels.Label{Name: "", Value: ""} + require.Equal(t, expRanges[emptyLabel].Start, ptr.Start) + require.Equal(t, expRanges[emptyLabel].End, ptr.End) + delete(expRanges, emptyLabel) + require.Empty(t, expRanges) } func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *block.Meta { diff --git a/pkg/storegateway/indexheader/index/postings.go b/pkg/storage/indexheader/index/postings.go similarity index 91% rename from pkg/storegateway/indexheader/index/postings.go rename to pkg/storage/indexheader/index/postings.go index 8b3d2de39f7..28505dffbab 100644 --- a/pkg/storegateway/indexheader/index/postings.go +++ b/pkg/storage/indexheader/index/postings.go @@ -17,8 +17,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/index" - streamencoding "github.com/grafana/mimir/pkg/storegateway/indexheader/encoding" - "github.com/grafana/mimir/pkg/storegateway/indexheader/indexheaderpb" + streamencoding "github.com/grafana/mimir/pkg/storage/indexheader/encoding" + "github.com/grafana/mimir/pkg/storage/indexheader/indexheaderpb" ) const ( @@ -43,6 +43,10 @@ type PostingOffsetTable interface { LabelNames() ([]string, error) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) + + // PostingOffsetInMemSampling returns the inverse of the fraction of postings held in memory. A lower value indicates + // postings are sample more frequently. + PostingOffsetInMemSampling() int } // PostingListOffset contains the start and end offset of a posting list. @@ -234,18 +238,42 @@ func NewPostingOffsetTableFromSparseHeader(factory *streamencoding.DecbufFactory postingOffsetsInMemSampling: postingOffsetsInMemSampling, } + pbSampling := int(postingsOffsetTable.GetPostingOffsetInMemorySampling()) + if pbSampling == 0 { + return nil, fmt.Errorf("sparse index-header sampling rate not set") + } + + if pbSampling > postingOffsetsInMemSampling { + return nil, fmt.Errorf("sparse index-header sampling rate exceeds in-mem-sampling rate") + } + + // if the sampling rate in the sparse index-header is set lower (more frequent) than + // the configured postingOffsetsInMemSampling we downsample to the configured rate + step, ok := stepSize(pbSampling, postingOffsetsInMemSampling) + if !ok { + return nil, fmt.Errorf("sparse index-header sampling rate not compatible with in-mem-sampling rate") + } + for sName, sOffsets := range postingsOffsetTable.Postings { - t.postings[sName] = &postingValueOffsets{ - offsets: make([]postingOffset, len(sOffsets.Offsets)), + + olen := len(sOffsets.Offsets) + downsampledLen := (olen + step - 1) / step + if (olen > 1) && (downsampledLen == 1) { + downsampledLen++ } + t.postings[sName] = &postingValueOffsets{offsets: make([]postingOffset, downsampledLen)} for i, sPostingOff := range sOffsets.Offsets { - t.postings[sName].offsets[i] = postingOffset{value: sPostingOff.Value, tableOff: int(sPostingOff.TableOff)} - } + if i%step == 0 { + t.postings[sName].offsets[i/step] = postingOffset{value: sPostingOff.Value, tableOff: int(sPostingOff.TableOff)} + } + if i == olen-1 { + t.postings[sName].offsets[downsampledLen-1] = postingOffset{value: sPostingOff.Value, tableOff: int(sPostingOff.TableOff)} + } + } t.postings[sName].lastValOffset = sOffsets.LastValOffset } - return &t, err } @@ -330,6 +358,10 @@ func (t *PostingOffsetTableV1) LabelNames() ([]string, error) { return labelNames, nil } +func (t *PostingOffsetTableV1) PostingOffsetInMemSampling() int { + return 0 +} + func (t *PostingOffsetTableV1) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) { return &indexheaderpb.PostingOffsetTable{} } @@ -608,10 +640,18 @@ func (t *PostingOffsetTableV2) LabelNames() ([]string, error) { return labelNames, nil } +func (t *PostingOffsetTableV2) PostingOffsetInMemSampling() int { + if t != nil { + return t.postingOffsetsInMemSampling + } + return 0 +} + // NewSparsePostingOffsetTable loads all postings offset table data into a sparse index-header to be persisted to disk func (t *PostingOffsetTableV2) NewSparsePostingOffsetTable() (table *indexheaderpb.PostingOffsetTable) { sparseHeaders := &indexheaderpb.PostingOffsetTable{ - Postings: make(map[string]*indexheaderpb.PostingValueOffsets, len(t.postings)), + Postings: make(map[string]*indexheaderpb.PostingValueOffsets, len(t.postings)), + PostingOffsetInMemorySampling: int64(t.postingOffsetsInMemSampling), } for name, offsets := range t.postings { @@ -640,3 +680,10 @@ func skipNAndName(d *streamencoding.Decbuf, buf *int) { } d.Skip(*buf) } + +func stepSize(cur, tgt int) (int, bool) { + if cur > tgt || cur <= 0 || tgt <= 0 || tgt%cur != 0 { + return 0, false + } + return tgt / cur, true +} diff --git a/pkg/storage/indexheader/index/postings_test.go b/pkg/storage/indexheader/index/postings_test.go new file mode 100644 index 00000000000..e3a76723391 --- /dev/null +++ b/pkg/storage/indexheader/index/postings_test.go @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package index + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + streamencoding "github.com/grafana/mimir/pkg/storage/indexheader/encoding" + "github.com/grafana/mimir/pkg/storage/indexheader/indexheaderpb" +) + +func TestPostingValueOffsets(t *testing.T) { + testCases := map[string]struct { + existingOffsets []postingOffset + prefix string + expectedFound bool + expectedStart int + expectedEnd int + }{ + "prefix not found": { + existingOffsets: []postingOffset{ + {value: "010"}, + {value: "019"}, + {value: "030"}, + {value: "031"}, + }, + prefix: "a", + expectedFound: false, + }, + "prefix matches only one sampled offset": { + existingOffsets: []postingOffset{ + {value: "010"}, + {value: "019"}, + {value: "030"}, + {value: "031"}, + }, + prefix: "02", + expectedFound: true, + expectedStart: 1, + expectedEnd: 2, + }, + "prefix matches all offsets": { + existingOffsets: []postingOffset{ + {value: "010"}, + {value: "019"}, + {value: "030"}, + {value: "031"}, + }, + prefix: "0", + expectedFound: true, + expectedStart: 0, + expectedEnd: 4, + }, + "prefix matches only last offset": { + existingOffsets: []postingOffset{ + {value: "010"}, + {value: "019"}, + {value: "030"}, + {value: "031"}, + }, + prefix: "031", + expectedFound: true, + expectedStart: 3, + expectedEnd: 4, + }, + "prefix matches multiple offsets": { + existingOffsets: []postingOffset{ + {value: "010"}, + {value: "019"}, + {value: "020"}, + {value: "030"}, + {value: "031"}, + }, + prefix: "02", + expectedFound: true, + expectedStart: 1, + expectedEnd: 3, + }, + "prefix matches only first offset": { + existingOffsets: []postingOffset{ + {value: "010"}, + {value: "019"}, + {value: "020"}, + {value: "030"}, + {value: "031"}, + }, + prefix: "015", + expectedFound: true, + expectedStart: 0, + expectedEnd: 1, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + offsets := postingValueOffsets{offsets: testCase.existingOffsets} + start, end, found := offsets.prefixOffsets(testCase.prefix) + assert.Equal(t, testCase.expectedStart, start) + assert.Equal(t, testCase.expectedEnd, end) + assert.Equal(t, testCase.expectedFound, found) + }) + } +} + +func createPostingOffset(n int) []*indexheaderpb.PostingOffset { + offsets := make([]*indexheaderpb.PostingOffset, n) + for i := 0; i < n; i++ { + offsets[i] = &indexheaderpb.PostingOffset{Value: fmt.Sprintf("%d", i), TableOff: int64(i)} + } + return offsets +} + +func Test_NewPostingOffsetTableFromSparseHeader(t *testing.T) { + + testCases := map[string]struct { + existingOffsetsLen int + postingOffsetsInMemSamplingRate int + protoSamplingRate int64 + expectedLen int + expectErr bool + }{ + "downsample_noop_proto_has_equal_sampling_rate": { + existingOffsetsLen: 100, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 32, + expectedLen: 100, + }, + "downsample_noop_preserve": { + existingOffsetsLen: 1, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 16, + expectedLen: 1, + }, + "downsample_noop_retain_first_and_last_posting": { + existingOffsetsLen: 2, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 16, + expectedLen: 2, + }, + "downsample_noop_retain_first_and_last_posting_larger_sampling_rates_ratio": { + existingOffsetsLen: 2, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 8, + expectedLen: 2, + }, + "downsample_short_offsets": { + existingOffsetsLen: 2, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 16, + expectedLen: 2, + }, + "downsample_noop_short_offsets": { + existingOffsetsLen: 1, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 16, + expectedLen: 1, + }, + "downsample_proto_has_divisible_sampling_rate": { + existingOffsetsLen: 100, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 16, + expectedLen: 50, + }, + "cannot_downsample_proto_has_no_sampling_rate": { + existingOffsetsLen: 100, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 0, + expectErr: true, + }, + "cannot_upsample_proto_has_less_frequent_sampling_rate": { + existingOffsetsLen: 100, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 64, + expectErr: true, + }, + "cannot_downsample_proto_has_non_divisible_sampling_rate": { + existingOffsetsLen: 100, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 10, + expectErr: true, + }, + "downsample_sampling_rates_ratio_does_not_divide_offsets": { + existingOffsetsLen: 33, + postingOffsetsInMemSamplingRate: 32, + protoSamplingRate: 16, + expectedLen: 17, + }, + "downsample_sampling_rates_ratio_exceeds_offset_len": { + existingOffsetsLen: 10, + postingOffsetsInMemSamplingRate: 1024, + protoSamplingRate: 8, + expectedLen: 2, + }, + "downsample_sampling_rates_ratio_equals_offset_len": { + existingOffsetsLen: 100, + postingOffsetsInMemSamplingRate: 100, + protoSamplingRate: 1, + expectedLen: 2, + }, + } + + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + factory := streamencoding.DecbufFactory{} + + postingsMap := make(map[string]*indexheaderpb.PostingValueOffsets) + postingsMap["__name__"] = &indexheaderpb.PostingValueOffsets{Offsets: createPostingOffset(testCase.existingOffsetsLen)} + + protoTbl := indexheaderpb.PostingOffsetTable{ + Postings: postingsMap, + PostingOffsetInMemorySampling: testCase.protoSamplingRate, + } + + tbl, err := NewPostingOffsetTableFromSparseHeader(&factory, &protoTbl, 0, testCase.postingOffsetsInMemSamplingRate) + if testCase.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, testCase.expectedLen, len(tbl.postings["__name__"].offsets)) + } + + }) + } + +} diff --git a/pkg/storegateway/indexheader/index/symbols.go b/pkg/storage/indexheader/index/symbols.go similarity index 98% rename from pkg/storegateway/indexheader/index/symbols.go rename to pkg/storage/indexheader/index/symbols.go index 649e2a43c33..e6268655501 100644 --- a/pkg/storegateway/indexheader/index/symbols.go +++ b/pkg/storage/indexheader/index/symbols.go @@ -16,8 +16,8 @@ import ( "github.com/grafana/dskit/runutil" "github.com/prometheus/prometheus/tsdb/index" - streamencoding "github.com/grafana/mimir/pkg/storegateway/indexheader/encoding" - "github.com/grafana/mimir/pkg/storegateway/indexheader/indexheaderpb" + streamencoding "github.com/grafana/mimir/pkg/storage/indexheader/encoding" + "github.com/grafana/mimir/pkg/storage/indexheader/indexheaderpb" ) // The table gets initialized with sync.Once but may still cause a race diff --git a/pkg/storegateway/indexheader/index/symbols_test.go b/pkg/storage/indexheader/index/symbols_test.go similarity index 98% rename from pkg/storegateway/indexheader/index/symbols_test.go rename to pkg/storage/indexheader/index/symbols_test.go index 81d1fc42a56..eb399f43330 100644 --- a/pkg/storegateway/indexheader/index/symbols_test.go +++ b/pkg/storage/indexheader/index/symbols_test.go @@ -18,7 +18,7 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/require" - streamencoding "github.com/grafana/mimir/pkg/storegateway/indexheader/encoding" + streamencoding "github.com/grafana/mimir/pkg/storage/indexheader/encoding" "github.com/grafana/mimir/pkg/util/test" ) diff --git a/pkg/storegateway/indexheader/indexheaderpb/sparse.pb.go b/pkg/storage/indexheader/indexheaderpb/sparse.pb.go similarity index 90% rename from pkg/storegateway/indexheader/indexheaderpb/sparse.pb.go rename to pkg/storage/indexheader/indexheaderpb/sparse.pb.go index 4e96f6aeba7..2408ca63c3e 100644 --- a/pkg/storegateway/indexheader/indexheaderpb/sparse.pb.go +++ b/pkg/storage/indexheader/indexheaderpb/sparse.pb.go @@ -130,7 +130,8 @@ func (m *Symbols) GetSymbolsCount() int64 { type PostingOffsetTable struct { // Postings is a map of label names -> PostingValueOffsets - Postings map[string]*PostingValueOffsets `protobuf:"bytes,1,rep,name=postings,proto3" json:"postings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Postings map[string]*PostingValueOffsets `protobuf:"bytes,1,rep,name=postings,proto3" json:"postings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + PostingOffsetInMemorySampling int64 `protobuf:"varint,2,opt,name=postingOffsetInMemorySampling,proto3" json:"postingOffsetInMemorySampling,omitempty"` } func (m *PostingOffsetTable) Reset() { *m = PostingOffsetTable{} } @@ -172,6 +173,13 @@ func (m *PostingOffsetTable) GetPostings() map[string]*PostingValueOffsets { return nil } +func (m *PostingOffsetTable) GetPostingOffsetInMemorySampling() int64 { + if m != nil { + return m.PostingOffsetInMemorySampling + } + return 0 +} + // PostingValueOffsets stores a list of the first, last, and every 32nd (config default) PostingOffset for this label name. type PostingValueOffsets struct { Offsets []*PostingOffset `protobuf:"bytes,1,rep,name=offsets,proto3" json:"offsets,omitempty"` @@ -289,33 +297,34 @@ func init() { func init() { proto.RegisterFile("sparse.proto", fileDescriptor_c442573753a956c7) } var fileDescriptor_c442573753a956c7 = []byte{ - // 402 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0x3f, 0x4f, 0xf2, 0x40, - 0x18, 0xef, 0xd1, 0xbc, 0xc0, 0x7b, 0x40, 0x42, 0x0e, 0x63, 0x08, 0x21, 0x17, 0x6d, 0x1c, 0x58, - 0x2c, 0x06, 0x13, 0x43, 0xdc, 0x84, 0x38, 0x31, 0x60, 0x8a, 0x61, 0x70, 0x31, 0xad, 0xb4, 0x85, - 0x58, 0x7a, 0x4d, 0xef, 0x6a, 0x64, 0xf3, 0x1b, 0xe8, 0xc7, 0xf0, 0x73, 0x38, 0x39, 0x32, 0x32, - 0x4a, 0x59, 0x1c, 0xf9, 0x08, 0xa6, 0x77, 0x2d, 0x5a, 0x25, 0xba, 0xdd, 0xf3, 0x3c, 0xbf, 0x3f, - 0xcf, 0xf3, 0xcb, 0xc1, 0x22, 0xf5, 0x74, 0x9f, 0x9a, 0xaa, 0xe7, 0x13, 0x46, 0x50, 0x69, 0xe2, - 0x8e, 0xcc, 0xfb, 0xb1, 0xa9, 0x8f, 0x4c, 0xdf, 0x33, 0x6a, 0x87, 0xf6, 0x84, 0x8d, 0x03, 0x43, - 0xbd, 0x21, 0xd3, 0xa6, 0x4d, 0x6c, 0xd2, 0xe4, 0x28, 0x23, 0xb0, 0x78, 0xc5, 0x0b, 0xfe, 0x12, - 0x6c, 0xe5, 0x11, 0xc0, 0xec, 0x80, 0xcb, 0xa1, 0x23, 0x98, 0xa3, 0xb3, 0xa9, 0x41, 0x1c, 0x5a, - 0x05, 0x7b, 0xa0, 0x51, 0x68, 0xed, 0xaa, 0x29, 0x69, 0x75, 0x20, 0xa6, 0x5a, 0x02, 0x43, 0x03, - 0x58, 0xf1, 0x08, 0x65, 0x13, 0xd7, 0xa6, 0x7d, 0xcb, 0xa2, 0x26, 0xbb, 0xd4, 0x0d, 0xc7, 0xac, - 0x66, 0x38, 0x7b, 0xff, 0x1b, 0xfb, 0x42, 0x20, 0xbf, 0x00, 0xb5, 0x6d, 0x6c, 0xa5, 0x07, 0x73, - 0xb1, 0x11, 0xaa, 0xc3, 0x1c, 0xe1, 0x93, 0x68, 0x23, 0xb9, 0x21, 0x77, 0x32, 0x65, 0xa0, 0x25, - 0x2d, 0xa4, 0xc0, 0x62, 0xbc, 0x48, 0x97, 0x04, 0x2e, 0xe3, 0xb6, 0xb2, 0x96, 0xea, 0x29, 0x2f, - 0x00, 0xa2, 0x9f, 0xc6, 0xa8, 0x07, 0xf3, 0x89, 0x35, 0x57, 0x2e, 0xb4, 0x9a, 0x7f, 0x6e, 0x9b, - 0xb4, 0xe8, 0xb9, 0xcb, 0xfc, 0x99, 0xb6, 0x11, 0xa8, 0x5d, 0xc3, 0x52, 0x6a, 0x84, 0xca, 0x50, - 0xbe, 0x35, 0x67, 0x3c, 0xc4, 0xff, 0x5a, 0xf4, 0x44, 0x6d, 0xf8, 0xef, 0x4e, 0x77, 0x82, 0x24, - 0x1a, 0x65, 0xbb, 0xd9, 0x30, 0x82, 0x08, 0x47, 0xaa, 0x09, 0xc2, 0x69, 0xa6, 0x0d, 0x14, 0x0a, - 0x2b, 0x5b, 0x10, 0xe8, 0x24, 0x9d, 0x4e, 0xa1, 0x55, 0xff, 0xed, 0x86, 0xcf, 0xdc, 0x0e, 0x60, - 0xc9, 0xd1, 0x29, 0x1b, 0xea, 0x8e, 0x98, 0xc4, 0xc1, 0xa5, 0x9b, 0xca, 0xd9, 0xe6, 0x2a, 0xd1, - 0x40, 0x3b, 0xc9, 0x0d, 0xe2, 0x2e, 0x51, 0xa0, 0x1a, 0xcc, 0xb3, 0x28, 0x9d, 0xbe, 0x65, 0xc5, - 0x3a, 0x9b, 0xba, 0xd3, 0x9d, 0x2f, 0xb1, 0xb4, 0x58, 0x62, 0x69, 0xbd, 0xc4, 0xe0, 0x21, 0xc4, - 0xe0, 0x39, 0xc4, 0xe0, 0x35, 0xc4, 0x60, 0x1e, 0x62, 0xf0, 0x16, 0x62, 0xf0, 0x1e, 0x62, 0x69, - 0x1d, 0x62, 0xf0, 0xb4, 0xc2, 0xd2, 0x7c, 0x85, 0xa5, 0xc5, 0x0a, 0x4b, 0x57, 0xe9, 0xff, 0x6c, - 0x64, 0xf9, 0x3f, 0x3d, 0xfe, 0x08, 0x00, 0x00, 0xff, 0xff, 0x81, 0x2f, 0x9e, 0xda, 0xf5, 0x02, - 0x00, 0x00, + // 432 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0xb1, 0x8e, 0xd3, 0x40, + 0x10, 0xf5, 0xda, 0xe2, 0x72, 0x4c, 0x2e, 0xd2, 0x69, 0x0f, 0xa1, 0x53, 0x74, 0xac, 0xc0, 0xa2, + 0xb8, 0x06, 0x07, 0x05, 0x09, 0x9d, 0xe8, 0xb8, 0x40, 0x81, 0x22, 0x14, 0x64, 0xa3, 0x14, 0x34, + 0xc8, 0x26, 0x6b, 0xc7, 0xc2, 0xf6, 0x5a, 0xde, 0x35, 0xc2, 0x1d, 0x25, 0x1d, 0x7c, 0x06, 0x9f, + 0x42, 0x99, 0x32, 0x25, 0x71, 0x1a, 0xca, 0x7c, 0x02, 0xf2, 0xae, 0x1d, 0x62, 0x88, 0x72, 0xdd, + 0xce, 0xcc, 0x9b, 0xf7, 0xde, 0xbc, 0xc4, 0x70, 0xc2, 0x53, 0x37, 0xe3, 0xd4, 0x4a, 0x33, 0x26, + 0x18, 0xee, 0x85, 0xc9, 0x8c, 0x7e, 0x9e, 0x53, 0x77, 0x46, 0xb3, 0xd4, 0xeb, 0x3f, 0x0a, 0x42, + 0x31, 0xcf, 0x3d, 0xeb, 0x03, 0x8b, 0x07, 0x01, 0x0b, 0xd8, 0x40, 0xa2, 0xbc, 0xdc, 0x97, 0x95, + 0x2c, 0xe4, 0x4b, 0x6d, 0x9b, 0xdf, 0x10, 0x1c, 0x39, 0x92, 0x0e, 0x3f, 0x86, 0x0e, 0x2f, 0x62, + 0x8f, 0x45, 0xfc, 0x1c, 0xdd, 0x47, 0x97, 0xdd, 0xe1, 0x5d, 0xab, 0x45, 0x6d, 0x39, 0x6a, 0x6a, + 0x37, 0x30, 0xec, 0xc0, 0x59, 0xca, 0xb8, 0x08, 0x93, 0x80, 0x4f, 0x7c, 0x9f, 0x53, 0xf1, 0xd6, + 0xf5, 0x22, 0x7a, 0xae, 0xcb, 0xed, 0x07, 0xff, 0x6c, 0xbf, 0x51, 0xc8, 0x1d, 0xa0, 0xbd, 0x6f, + 0xdb, 0x1c, 0x43, 0xa7, 0x16, 0xc2, 0x17, 0xd0, 0x61, 0x72, 0x52, 0x39, 0x32, 0x2e, 0x8d, 0x6b, + 0xfd, 0x14, 0xd9, 0x4d, 0x0b, 0x9b, 0x70, 0x52, 0x1b, 0x19, 0xb1, 0x3c, 0x11, 0x52, 0xd6, 0xb0, + 0x5b, 0x3d, 0xf3, 0xab, 0x0e, 0xf8, 0x7f, 0x61, 0x3c, 0x86, 0xe3, 0x46, 0x5a, 0x32, 0x77, 0x87, + 0x83, 0x1b, 0xdd, 0x36, 0x2d, 0xfe, 0x32, 0x11, 0x59, 0x61, 0x6f, 0x09, 0xf0, 0x0b, 0xb8, 0x97, + 0xee, 0xa2, 0x5f, 0x25, 0xaf, 0x69, 0xcc, 0xb2, 0xc2, 0x71, 0xe3, 0x34, 0x0a, 0x93, 0xa0, 0x36, + 0x76, 0x18, 0xd4, 0x7f, 0x0f, 0xbd, 0x96, 0x00, 0x3e, 0x05, 0xe3, 0x23, 0x2d, 0xe4, 0x4f, 0x71, + 0xdb, 0xae, 0x9e, 0xf8, 0x0a, 0x6e, 0x7d, 0x72, 0xa3, 0xbc, 0x09, 0xd8, 0xdc, 0x6f, 0x79, 0x5a, + 0x41, 0x94, 0x08, 0xb7, 0xd5, 0xc2, 0x33, 0xfd, 0x0a, 0x99, 0x1c, 0xce, 0xf6, 0x20, 0xf0, 0xd3, + 0x76, 0xc6, 0xdd, 0xe1, 0xc5, 0xa1, 0x24, 0xfe, 0xa6, 0xff, 0x10, 0x7a, 0x91, 0xcb, 0xc5, 0xd4, + 0x8d, 0xd4, 0xa4, 0xbe, 0xb2, 0xdd, 0x34, 0x9f, 0x6f, 0xaf, 0x52, 0x0d, 0x7c, 0xa7, 0xb9, 0x41, + 0xdd, 0xa5, 0x0a, 0xdc, 0x87, 0x63, 0x51, 0x65, 0x3c, 0xf1, 0xfd, 0x9a, 0x67, 0x5b, 0x5f, 0x8f, + 0x16, 0x2b, 0xa2, 0x2d, 0x57, 0x44, 0xdb, 0xac, 0x08, 0xfa, 0x52, 0x12, 0xf4, 0xa3, 0x24, 0xe8, + 0x67, 0x49, 0xd0, 0xa2, 0x24, 0xe8, 0x57, 0x49, 0xd0, 0xef, 0x92, 0x68, 0x9b, 0x92, 0xa0, 0xef, + 0x6b, 0xa2, 0x2d, 0xd6, 0x44, 0x5b, 0xae, 0x89, 0xf6, 0xae, 0xfd, 0x55, 0x78, 0x47, 0xf2, 0xdf, + 0xfe, 0xe4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4c, 0x91, 0x1d, 0x5b, 0x3b, 0x03, 0x00, 0x00, } func (this *Sparse) Equal(that interface{}) bool { @@ -404,6 +413,9 @@ func (this *PostingOffsetTable) Equal(that interface{}) bool { return false } } + if this.PostingOffsetInMemorySampling != that1.PostingOffsetInMemorySampling { + return false + } return true } func (this *PostingValueOffsets) Equal(that interface{}) bool { @@ -495,7 +507,7 @@ func (this *PostingOffsetTable) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 5) + s := make([]string, 0, 6) s = append(s, "&indexheaderpb.PostingOffsetTable{") keysForPostings := make([]string, 0, len(this.Postings)) for k, _ := range this.Postings { @@ -510,6 +522,7 @@ func (this *PostingOffsetTable) GoString() string { if this.Postings != nil { s = append(s, "Postings: "+mapStringForPostings+",\n") } + s = append(s, "PostingOffsetInMemorySampling: "+fmt.Sprintf("%#v", this.PostingOffsetInMemorySampling)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -659,6 +672,11 @@ func (m *PostingOffsetTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.PostingOffsetInMemorySampling != 0 { + i = encodeVarintSparse(dAtA, i, uint64(m.PostingOffsetInMemorySampling)) + i-- + dAtA[i] = 0x10 + } if len(m.Postings) > 0 { for k := range m.Postings { v := m.Postings[k] @@ -831,6 +849,9 @@ func (m *PostingOffsetTable) Size() (n int) { n += mapEntrySize + 1 + sovSparse(uint64(mapEntrySize)) } } + if m.PostingOffsetInMemorySampling != 0 { + n += 1 + sovSparse(uint64(m.PostingOffsetInMemorySampling)) + } return n } @@ -912,6 +933,7 @@ func (this *PostingOffsetTable) String() string { mapStringForPostings += "}" s := strings.Join([]string{`&PostingOffsetTable{`, `Postings:` + mapStringForPostings + `,`, + `PostingOffsetInMemorySampling:` + fmt.Sprintf("%v", this.PostingOffsetInMemorySampling) + `,`, `}`, }, "") return s @@ -1382,6 +1404,25 @@ func (m *PostingOffsetTable) Unmarshal(dAtA []byte) error { } m.Postings[mapkey] = mapvalue iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PostingOffsetInMemorySampling", wireType) + } + m.PostingOffsetInMemorySampling = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSparse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PostingOffsetInMemorySampling |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipSparse(dAtA[iNdEx:]) diff --git a/pkg/storegateway/indexheader/indexheaderpb/sparse.proto b/pkg/storage/indexheader/indexheaderpb/sparse.proto similarity index 96% rename from pkg/storegateway/indexheader/indexheaderpb/sparse.proto rename to pkg/storage/indexheader/indexheaderpb/sparse.proto index c20c45f1ebd..6027e938f88 100644 --- a/pkg/storegateway/indexheader/indexheaderpb/sparse.proto +++ b/pkg/storage/indexheader/indexheaderpb/sparse.proto @@ -26,6 +26,7 @@ message Symbols { message PostingOffsetTable { // Postings is a map of label names -> PostingValueOffsets map postings = 1; + int64 postingOffsetInMemorySampling = 2; } // PostingValueOffsets stores a list of the first, last, and every 32nd (config default) PostingOffset for this label name. diff --git a/pkg/storegateway/indexheader/lazy_binary_reader.go b/pkg/storage/indexheader/lazy_binary_reader.go similarity index 99% rename from pkg/storegateway/indexheader/lazy_binary_reader.go rename to pkg/storage/indexheader/lazy_binary_reader.go index 98c587ced42..40cb377a38e 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader.go +++ b/pkg/storage/indexheader/lazy_binary_reader.go @@ -24,8 +24,8 @@ import ( "github.com/thanos-io/objstore" "go.uber.org/atomic" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/tsdb/block" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" ) var ( diff --git a/pkg/storegateway/indexheader/lazy_binary_reader_test.go b/pkg/storage/indexheader/lazy_binary_reader_test.go similarity index 99% rename from pkg/storegateway/indexheader/lazy_binary_reader_test.go rename to pkg/storage/indexheader/lazy_binary_reader_test.go index 13e79f92b5e..ab038957835 100644 --- a/pkg/storegateway/indexheader/lazy_binary_reader_test.go +++ b/pkg/storage/indexheader/lazy_binary_reader_test.go @@ -28,8 +28,8 @@ import ( "github.com/thanos-io/objstore/providers/filesystem" "go.uber.org/atomic" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/tsdb/block" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/util/test" ) diff --git a/pkg/storegateway/indexheader/reader_benchmarks_test.go b/pkg/storage/indexheader/reader_benchmarks_test.go similarity index 100% rename from pkg/storegateway/indexheader/reader_benchmarks_test.go rename to pkg/storage/indexheader/reader_benchmarks_test.go diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storage/indexheader/reader_pool.go similarity index 100% rename from pkg/storegateway/indexheader/reader_pool.go rename to pkg/storage/indexheader/reader_pool.go diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storage/indexheader/reader_pool_test.go similarity index 100% rename from pkg/storegateway/indexheader/reader_pool_test.go rename to pkg/storage/indexheader/reader_pool_test.go diff --git a/pkg/storegateway/indexheader/snapshotter.go b/pkg/storage/indexheader/snapshotter.go similarity index 100% rename from pkg/storegateway/indexheader/snapshotter.go rename to pkg/storage/indexheader/snapshotter.go diff --git a/pkg/storegateway/indexheader/snapshotter_test.go b/pkg/storage/indexheader/snapshotter_test.go similarity index 100% rename from pkg/storegateway/indexheader/snapshotter_test.go rename to pkg/storage/indexheader/snapshotter_test.go diff --git a/pkg/storegateway/indexheader/stream_binary_reader.go b/pkg/storage/indexheader/stream_binary_reader.go similarity index 85% rename from pkg/storegateway/indexheader/stream_binary_reader.go rename to pkg/storage/indexheader/stream_binary_reader.go index 88d9ab1657b..4fa3cafd444 100644 --- a/pkg/storegateway/indexheader/stream_binary_reader.go +++ b/pkg/storage/indexheader/stream_binary_reader.go @@ -24,10 +24,10 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/objstore" + streamencoding "github.com/grafana/mimir/pkg/storage/indexheader/encoding" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" + "github.com/grafana/mimir/pkg/storage/indexheader/indexheaderpb" "github.com/grafana/mimir/pkg/storage/tsdb/block" - streamencoding "github.com/grafana/mimir/pkg/storegateway/indexheader/encoding" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" - "github.com/grafana/mimir/pkg/storegateway/indexheader/indexheaderpb" "github.com/grafana/mimir/pkg/util/atomicfs" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -93,7 +93,9 @@ func NewStreamBinaryReader(ctx context.Context, logger log.Logger, bkt objstore. } // newFileStreamBinaryReader loads sparse index-headers from disk or constructs it from the index-header if not available. -func newFileStreamBinaryReader(binPath string, id ulid.ULID, sparseHeadersPath string, postingOffsetsInMemSampling int, logger *spanlogger.SpanLogger, metrics *StreamBinaryReaderMetrics, cfg Config) (bw *StreamBinaryReader, err error) { +func newFileStreamBinaryReader(binPath string, id ulid.ULID, sparseHeadersPath string, postingOffsetsInMemSampling int, logger log.Logger, metrics *StreamBinaryReaderMetrics, cfg Config) (bw *StreamBinaryReader, err error) { + logger = log.With(logger, "id", id, "path", sparseHeadersPath, "inmem_sampling_rate", postingOffsetsInMemSampling) + r := &StreamBinaryReader{ factory: streamencoding.NewDecbufFactory(binPath, cfg.MaxIdleFileHandles, metrics.decbufFactory), } @@ -141,33 +143,37 @@ func newFileStreamBinaryReader(binPath string, id ulid.ULID, sparseHeadersPath s // Load in sparse symbols and postings offset table; from disk if this is a v2 index. if r.indexVersion == index.FormatV2 { + var reconstruct bool sparseData, err := os.ReadFile(sparseHeadersPath) if err != nil && !os.IsNotExist(err) { - level.Warn(logger).Log("msg", "failed to read sparse index-headers from disk; recreating", "id", id, "err", err) + level.Warn(logger).Log("msg", "failed to read sparse index-headers from disk; recreating", "err", err) + } + + if err == nil { + if err = r.loadFromSparseIndexHeader(logger, sparseData, postingOffsetsInMemSampling); err != nil { + reconstruct = true + } } - // If sparseHeaders are not on disk, construct sparseHeaders and write to disk. - if err != nil { - if err = r.loadFromIndexHeader(logger, id, cfg, indexLastPostingListEndBound, postingOffsetsInMemSampling); err != nil { + // reconstruct from index if the sparse index-header file isn't on disk or if the sampling rate of the headers + // on disk can't be downsampled to the desired rate. + if err != nil || reconstruct { + if err = r.loadFromIndexHeader(logger, cfg, indexLastPostingListEndBound, postingOffsetsInMemSampling); err != nil { return nil, fmt.Errorf("cannot load sparse index-header: %w", err) } - if err := writeSparseHeadersToFile(logger, id, sparseHeadersPath, r); err != nil { + if err := writeSparseHeadersToFile(logger, sparseHeadersPath, r); err != nil { return nil, fmt.Errorf("cannot write sparse index-header to disk: %w", err) } - - level.Debug(logger).Log("msg", "built sparse index-header file", "id", id, "path", sparseHeadersPath) - } else { - // Otherwise, read persisted sparseHeaders from disk to memory. - if err = r.loadFromSparseIndexHeader(logger, id, sparseHeadersPath, sparseData, postingOffsetsInMemSampling); err != nil { - return nil, fmt.Errorf("cannot load sparse index-header from disk: %w", err) - } + level.Debug(logger).Log("msg", "built sparse index-header file") } + } else { - if err = r.loadFromIndexHeader(logger, id, cfg, indexLastPostingListEndBound, postingOffsetsInMemSampling); err != nil { + if err = r.loadFromIndexHeader(logger, cfg, indexLastPostingListEndBound, postingOffsetsInMemSampling); err != nil { return nil, fmt.Errorf("cannot load sparse index-header: %w", err) } } + level.Debug(logger).Log("msg", "built sparse index-header file") labelNames, err := r.postingsOffsetTable.LabelNames() if err != nil { return nil, fmt.Errorf("cannot load label names from postings offset table: %w", err) @@ -185,13 +191,13 @@ func newFileStreamBinaryReader(binPath string, id ulid.ULID, sparseHeadersPath s } // loadFromSparseIndexHeader load from sparse index-header on disk. -func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLogger, id ulid.ULID, sparseHeadersPath string, sparseData []byte, postingOffsetsInMemSampling int) (err error) { +func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger log.Logger, sparseData []byte, postingOffsetsInMemSampling int) (err error) { start := time.Now() defer func() { - level.Info(logger).Log("msg", "loaded sparse index-header from disk", "id", id, "path", sparseHeadersPath, "elapsed", time.Since(start)) + level.Info(logger).Log("msg", "loaded sparse index-header from disk", "elapsed", time.Since(start)) }() - level.Info(logger).Log("msg", "loading sparse index-header from disk", "id", id, "path", sparseHeadersPath) + level.Info(logger).Log("msg", "loading sparse index-header from disk") sparseHeaders := &indexheaderpb.Sparse{} gzipped := bytes.NewReader(sparseData) @@ -224,13 +230,13 @@ func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLo } // loadFromIndexHeader loads in symbols and postings offset table from the index-header. -func (r *StreamBinaryReader) loadFromIndexHeader(logger *spanlogger.SpanLogger, id ulid.ULID, cfg Config, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int) (err error) { +func (r *StreamBinaryReader) loadFromIndexHeader(logger log.Logger, cfg Config, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int) (err error) { start := time.Now() defer func() { - level.Info(logger).Log("msg", "loaded sparse index-header from full index-header", "id", id, "elapsed", time.Since(start)) + level.Info(logger).Log("msg", "loaded sparse index-header from full index-header", "elapsed", time.Since(start)) }() - level.Info(logger).Log("msg", "loading sparse index-header from full index-header", "id", id) + level.Info(logger).Log("msg", "loading sparse index-header from full index-header") r.symbols, err = streamindex.NewSymbols(r.factory, r.indexVersion, int(r.toc.Symbols), cfg.VerifyOnLoad) if err != nil { @@ -246,13 +252,13 @@ func (r *StreamBinaryReader) loadFromIndexHeader(logger *spanlogger.SpanLogger, } // writeSparseHeadersToFile uses protocol buffer to write StreamBinaryReader to disk at sparseHeadersPath. -func writeSparseHeadersToFile(logger *spanlogger.SpanLogger, id ulid.ULID, sparseHeadersPath string, reader *StreamBinaryReader) error { +func writeSparseHeadersToFile(logger log.Logger, sparseHeadersPath string, reader *StreamBinaryReader) error { start := time.Now() defer func() { - level.Info(logger).Log("msg", "wrote sparse index-header to disk", "id", id, "path", sparseHeadersPath, "elapsed", time.Since(start)) + level.Info(logger).Log("msg", "wrote sparse index-header to disk", "elapsed", time.Since(start)) }() - level.Info(logger).Log("msg", "writing sparse index-header to disk", "id", id, "path", sparseHeadersPath) + level.Info(logger).Log("msg", "writing sparse index-header to disk") sparseHeaders := &indexheaderpb.Sparse{} sparseHeaders.Symbols = reader.symbols.NewSparseSymbol() diff --git a/pkg/storegateway/indexheader/stream_binary_reader_test.go b/pkg/storage/indexheader/stream_binary_reader_test.go similarity index 96% rename from pkg/storegateway/indexheader/stream_binary_reader_test.go rename to pkg/storage/indexheader/stream_binary_reader_test.go index 219b9b0c109..1335c3c4fb2 100644 --- a/pkg/storegateway/indexheader/stream_binary_reader_test.go +++ b/pkg/storage/indexheader/stream_binary_reader_test.go @@ -16,8 +16,8 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/objstore/providers/filesystem" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/tsdb/block" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -50,7 +50,7 @@ func TestStreamBinaryReader_ShouldBuildSparseHeadersFromFileSimple(t *testing.T) require.NoError(t, err) logger := spanlogger.FromContext(context.Background(), log.NewNopLogger()) - err = r.loadFromSparseIndexHeader(logger, blockID, sparseHeadersPath, sparseData, 3) + err = r.loadFromSparseIndexHeader(logger, sparseData, 3) require.NoError(t, err) } @@ -91,6 +91,7 @@ func TestStreamBinaryReader_CheckSparseHeadersCorrectnessExtensive(t *testing.T) // Check correctness of sparse index headers. compareIndexToHeader(t, b, r2) + compareIndexToHeaderPostings(t, b, r2) }) } } diff --git a/pkg/storegateway/indexheader/testdata/index_format_v1/chunks/.gitkeep b/pkg/storage/indexheader/testdata/index_format_v1/chunks/.gitkeep similarity index 100% rename from pkg/storegateway/indexheader/testdata/index_format_v1/chunks/.gitkeep rename to pkg/storage/indexheader/testdata/index_format_v1/chunks/.gitkeep diff --git a/pkg/storegateway/indexheader/testdata/index_format_v1/index b/pkg/storage/indexheader/testdata/index_format_v1/index similarity index 100% rename from pkg/storegateway/indexheader/testdata/index_format_v1/index rename to pkg/storage/indexheader/testdata/index_format_v1/index diff --git a/pkg/storegateway/indexheader/testdata/index_format_v1/meta.json b/pkg/storage/indexheader/testdata/index_format_v1/meta.json similarity index 100% rename from pkg/storegateway/indexheader/testdata/index_format_v1/meta.json rename to pkg/storage/indexheader/testdata/index_format_v1/meta.json diff --git a/pkg/storegateway/indexheader/testdata/index_format_v2/chunks/.gitkeep b/pkg/storage/indexheader/testdata/index_format_v2/chunks/.gitkeep similarity index 100% rename from pkg/storegateway/indexheader/testdata/index_format_v2/chunks/.gitkeep rename to pkg/storage/indexheader/testdata/index_format_v2/chunks/.gitkeep diff --git a/pkg/storegateway/indexheader/testdata/index_format_v2/index b/pkg/storage/indexheader/testdata/index_format_v2/index similarity index 100% rename from pkg/storegateway/indexheader/testdata/index_format_v2/index rename to pkg/storage/indexheader/testdata/index_format_v2/index diff --git a/pkg/storegateway/indexheader/testdata/index_format_v2/meta.json b/pkg/storage/indexheader/testdata/index_format_v2/meta.json similarity index 100% rename from pkg/storegateway/indexheader/testdata/index_format_v2/meta.json rename to pkg/storage/indexheader/testdata/index_format_v2/meta.json diff --git a/pkg/storage/tsdb/block/block.go b/pkg/storage/tsdb/block/block.go index be8d04ee3ca..e53f8820fbb 100644 --- a/pkg/storage/tsdb/block/block.go +++ b/pkg/storage/tsdb/block/block.go @@ -121,6 +121,15 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blockDi return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index")) } + src := filepath.Join(blockDir, SparseIndexHeaderFilename) + dst := filepath.Join(id.String(), SparseIndexHeaderFilename) + if _, err := os.Stat(src); err == nil { + if err := objstore.UploadFile(ctx, logger, bkt, src, dst); err != nil { + // Don't call cleanUp. Uploading sparse index headers is best effort. + level.Warn(logger).Log("msg", "failed to upload sparse index headers", "block", id.String(), "err", err) + } + } + // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads. if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err != nil { // Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases, diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 6c7dfd2d573..b7ea189b6e0 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -20,7 +20,7 @@ import ( "github.com/grafana/mimir/pkg/ingester/activeseries" "github.com/grafana/mimir/pkg/storage/bucket" - "github.com/grafana/mimir/pkg/storegateway/indexheader" + "github.com/grafana/mimir/pkg/storage/indexheader" ) const ( diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 1fd9dd72758..bd7ac467701 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -45,14 +45,14 @@ import ( "google.golang.org/grpc/status" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/storage/indexheader" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/sharding" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" - "github.com/grafana/mimir/pkg/storegateway/indexheader" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 72976f55b6e..32d3f384327 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -34,10 +34,10 @@ import ( "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/mimirpb" + "github.com/grafana/mimir/pkg/storage/indexheader" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storegateway/indexcache" - "github.com/grafana/mimir/pkg/storegateway/indexheader" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util/test" ) diff --git a/pkg/storegateway/bucket_index_postings.go b/pkg/storegateway/bucket_index_postings.go index 6111374c6cc..98b7022f4ba 100644 --- a/pkg/storegateway/bucket_index_postings.go +++ b/pkg/storegateway/bucket_index_postings.go @@ -17,10 +17,10 @@ import ( "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + "github.com/grafana/mimir/pkg/storage/indexheader" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/sharding" "github.com/grafana/mimir/pkg/storage/tsdb" - "github.com/grafana/mimir/pkg/storegateway/indexheader" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" ) // rawPostingGroup keeps posting keys for single matcher. It is raw because there is no guarantee diff --git a/pkg/storegateway/bucket_index_postings_test.go b/pkg/storegateway/bucket_index_postings_test.go index 3f0eccdb667..12e1d566708 100644 --- a/pkg/storegateway/bucket_index_postings_test.go +++ b/pkg/storegateway/bucket_index_postings_test.go @@ -15,7 +15,7 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "github.com/stretchr/testify/assert" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" ) func TestBigEndianPostingsCount(t *testing.T) { diff --git a/pkg/storegateway/bucket_index_reader.go b/pkg/storegateway/bucket_index_reader.go index f97d1507235..8b744661a45 100644 --- a/pkg/storegateway/bucket_index_reader.go +++ b/pkg/storegateway/bucket_index_reader.go @@ -31,10 +31,10 @@ import ( "github.com/prometheus/prometheus/tsdb/index" "golang.org/x/sync/errgroup" + "github.com/grafana/mimir/pkg/storage/indexheader" + streamindex "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storegateway/indexcache" - "github.com/grafana/mimir/pkg/storegateway/indexheader" - streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/spanlogger" diff --git a/pkg/storegateway/bucket_store_metrics.go b/pkg/storegateway/bucket_store_metrics.go index 4a411be1dff..e6b3255494f 100644 --- a/pkg/storegateway/bucket_store_metrics.go +++ b/pkg/storegateway/bucket_store_metrics.go @@ -12,7 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/grafana/mimir/pkg/storegateway/indexheader" + "github.com/grafana/mimir/pkg/storage/indexheader" ) // BucketStoreMetrics holds all the metrics tracked by BucketStore. These metrics diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 2ea825294db..1f0cab5c9e2 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -56,13 +56,13 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/indexheader" + "github.com/grafana/mimir/pkg/storage/indexheader/index" "github.com/grafana/mimir/pkg/storage/sharding" mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/indexcache" - "github.com/grafana/mimir/pkg/storegateway/indexheader" - "github.com/grafana/mimir/pkg/storegateway/indexheader/index" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util/pool" "github.com/grafana/mimir/pkg/util/test" diff --git a/pkg/storegateway/indexheader/index/postings_test.go b/pkg/storegateway/indexheader/index/postings_test.go deleted file mode 100644 index 2a7cd943265..00000000000 --- a/pkg/storegateway/indexheader/index/postings_test.go +++ /dev/null @@ -1,102 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package index - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestPostingValueOffsets(t *testing.T) { - testCases := map[string]struct { - existingOffsets []postingOffset - prefix string - expectedFound bool - expectedStart int - expectedEnd int - }{ - "prefix not found": { - existingOffsets: []postingOffset{ - {value: "010"}, - {value: "019"}, - {value: "030"}, - {value: "031"}, - }, - prefix: "a", - expectedFound: false, - }, - "prefix matches only one sampled offset": { - existingOffsets: []postingOffset{ - {value: "010"}, - {value: "019"}, - {value: "030"}, - {value: "031"}, - }, - prefix: "02", - expectedFound: true, - expectedStart: 1, - expectedEnd: 2, - }, - "prefix matches all offsets": { - existingOffsets: []postingOffset{ - {value: "010"}, - {value: "019"}, - {value: "030"}, - {value: "031"}, - }, - prefix: "0", - expectedFound: true, - expectedStart: 0, - expectedEnd: 4, - }, - "prefix matches only last offset": { - existingOffsets: []postingOffset{ - {value: "010"}, - {value: "019"}, - {value: "030"}, - {value: "031"}, - }, - prefix: "031", - expectedFound: true, - expectedStart: 3, - expectedEnd: 4, - }, - "prefix matches multiple offsets": { - existingOffsets: []postingOffset{ - {value: "010"}, - {value: "019"}, - {value: "020"}, - {value: "030"}, - {value: "031"}, - }, - prefix: "02", - expectedFound: true, - expectedStart: 1, - expectedEnd: 3, - }, - "prefix matches only first offset": { - existingOffsets: []postingOffset{ - {value: "010"}, - {value: "019"}, - {value: "020"}, - {value: "030"}, - {value: "031"}, - }, - prefix: "015", - expectedFound: true, - expectedStart: 0, - expectedEnd: 1, - }, - } - - for testName, testCase := range testCases { - t.Run(testName, func(t *testing.T) { - offsets := postingValueOffsets{offsets: testCase.existingOffsets} - start, end, found := offsets.prefixOffsets(testCase.prefix) - assert.Equal(t, testCase.expectedStart, start) - assert.Equal(t, testCase.expectedEnd, end) - assert.Equal(t, testCase.expectedFound, found) - }) - } -}