Skip to content

Commit

Permalink
Compactor: Upload Sparse Index Headers to Object Storage (#10684)
Browse files Browse the repository at this point in the history
* mv indexheader into utils

* fmt + benchmarks for BinaryWrite

* update benchmarks

* update tests on compactor e2e

* update tests on compactor e2e

* fix err handling in WriteBinary

* mv indexheader to pkg/storage

* rm change to BinaryWrite

* rm unused import related to BinaryWrite

* pass uploadSparseIndexHeaders through Config + update docs

* update docs

* docs

* docs

* handrail comments; rm TODO

* comments

* add handling for configured sampling rate != sparse-index-header sampling rate

* add comments on DownsamplePostings

* updates to downsampling

* golangci-lint

* add todo comment on test, can pass unexpectedly

* update to tests

* review comments

* address review comments

* golint

* pass config through init functions

* update downsampling in NewPostingOffsetTableFromSparseHeader to always take last

* fix postings to pass TestStreamBinaryReader_CheckSparseHeadersCorrectnessExtive

* fix postings to pass TestStreamBinaryReader_CheckSparseHeadersCorrectnessExtive

* stat sparse index headers before block upload; no warning on failed upload if not on fs

* posting sampling tests

* update header sampling tests

* split runCompactionJob upload into multiple concurrency.ForEachJob

* update changelog.md

* golint

* Update CHANGELOG.md

* Update CHANGELOG.md

* Revert "Update CHANGELOG.md"

This reverts commit 92b4610.

* Update pkg/compactor/bucket_compactor.go

Co-authored-by: Dimitar Dimitrov <[email protected]>

* add struct fields on test

* rework downsampling tests; require first and last

* add check for first and last table offsets to CheckSparseHeadersCorrectnessExtensive

* check all ranges in index are in header

* comment on offset adjust

* Update docs/sources/mimir/configure/configuration-parameters/index.md

Co-authored-by: Taylor C <[email protected]>

* update docs

---------

Co-authored-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Taylor C <[email protected]>
  • Loading branch information
3 people authored Mar 7, 2025
1 parent 42198d3 commit 769906a
Show file tree
Hide file tree
Showing 50 changed files with 737 additions and 227 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702
* [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [CHANGE] Querier: pass context to queryable `IsApplicable` hook. #10451
* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
Expand All @@ -17,7 +18,7 @@
* [CHANGE] Ruler: Add `user` and `reason` labels to `cortex_ruler_write_requests_failed_total` and `cortex_ruler_queries_failed_total`; add `user` to
`cortex_ruler_write_requests_total` and `cortex_ruler_queries_total` metrics. #10536
* [CHANGE] Querier / Query-frontend: Remove experimental `-querier.promql-experimental-functions-enabled` and `-query-frontend.block-promql-experimental-functions` CLI flags and respective YAML configuration options to enable experimental PromQL functions. Instead access to experimental PromQL functions is always blocked. You can enable them using the per-tenant setting `enabled_promql_experimental_functions`. #10660 #10712
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [CHANGE] Store-gateway: Include posting sampling rate in sparse index headers. When the sampling rate isn't set in a sparse index header, store gateway will rebuild the sparse header with the configured `blocks-storage.bucket-store.posting-offsets-in-mem-sampling` value. If the sparse header's sampling rate is set, but doesn't match the configured rate, store gateway will either rebuild the sparse header or downsample to the configured sampling rate. #10684
* [ENHANCEMENT] Compactor: Expose `cortex_bucket_index_last_successful_update_timestamp_seconds` for all tenants assigned to the compactor before starting the block cleanup job. #10569
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
Expand Down Expand Up @@ -64,6 +65,7 @@
* [ENHANCEMENT] All: Add `cortex_client_request_invalid_cluster_validation_labels_total` metrics, that is used by Mimir's gRPC clients to track invalid cluster validations. #10767
* [ENHANCEMENT] Ingester client: Add support to configure cluster validation for ingester clients. Failed cluster validations are tracked by `cortex_client_request_invalid_cluster_validation_labels_total` with label `client=ingester`. #10767
* [ENHANCEMENT] Add experimental metric `cortex_distributor_dropped_native_histograms_total` to measure native histograms silently dropped when native histograms are disabled for a tenant. #10760
* [ENCHACEMENT] Compactor: Add experimental `-compactor.upload-sparse-index-headers` option. When enabled, the compactor will attempt to upload sparse index headers to object storage. This prevents latency spikes after adding store-gateway replicas. #10684
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11618,6 +11618,17 @@
"fieldFlag": "compactor.max-lookback",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "upload_sparse_index_headers",
"required": false,
"desc": "If enabled, the compactor constructs and uploads sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "compactor.upload-sparse-index-headers",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,8 @@ Usage of ./cmd/mimir/mimir:
Number of symbols flushers used when doing split compaction. (default 1)
-compactor.tenant-cleanup-delay duration
For tenants marked for deletion, this is the time between deletion of the last block, and doing final cleanup (marker files, debug files) of the tenant. (default 6h0m0s)
-compactor.upload-sparse-index-headers
[experimental] If enabled, the compactor constructs and uploads sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.
-config.expand-env
Expands ${var} or $var in config according to the values of the environment variables.
-config.file value
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ The following features are currently experimental:
- `-compactor.in-memory-tenant-meta-cache-size`
- Limit blocks processed in each compaction cycle. Blocks uploaded prior to the maximum lookback aren't processed.
- `-compactor.max-lookback`
- Enable the compactor to upload sparse index headers to object storage during compaction cycles.
- `-compactor.upload-sparse-index-headers`
- Ruler
- Aligning of evaluation timestamp on interval (`align_evaluation_time_on_interval`)
- Allow defining limits on the maximum number of rules allowed in a rule group by namespace and the maximum number of rule groups by namespace. If set, this supersedes the `-ruler.max-rules-per-rule-group` and `-ruler.max-rule-groups-per-tenant` limits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4971,6 +4971,13 @@ sharding_ring:
# blocks are considered regardless of their upload time.
# CLI flag: -compactor.max-lookback
[max_lookback: <duration> | default = 0s]

# (experimental) If enabled, the compactor constructs and uploads sparse index
# headers to object storage during each compaction cycle. This allows
# store-gateway instances to use the sparse headers from object storage instead
# of recreating them locally.
# CLI flag: -compactor.upload-sparse-index-headers
[upload_sparse_index_headers: <boolean> | default = false]
```
### store_gateway
Expand Down
119 changes: 85 additions & 34 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -394,25 +396,26 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger)

// update labels and verify all blocks
err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]

uploadedBlocks.Inc()

bdir := filepath.Join(subDir, blockToUpload.ulid.String())

// When splitting is enabled, we need to inject the shard ID as an external label.
newLabels := job.Labels().Map()
if job.UseSplitting() {
newLabels[mimir_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))
}
blocksToUpload[idx].labels = newLabels

newMeta, err := block.InjectThanosMeta(jobLogger, bdir, block.ThanosMeta{
Labels: newLabels,
Downsample: block.ThanosDownsample{Resolution: job.Resolution()},
Source: block.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)

if err != nil {
return errors.Wrapf(err, "failed to finalize the block %s", bdir)
}
Expand All @@ -421,18 +424,47 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return errors.Wrap(err, "remove tombstones")
}

// Ensure the compacted block is valid.
if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
return errors.Wrapf(err, "invalid result block %s", bdir)
}
return nil
})
if err != nil {
return false, nil, err
}

// Optionally build sparse-index-headers. Building sparse-index-headers is best effort, we do not skip uploading a
// compacted block if there's an error affecting sparse-index-headers.
if c.uploadSparseIndexHeaders {
// Create a bucket backed by the local compaction directory, allows calls to prepareSparseIndexHeader to
// construct sparse-index-headers without making requests to object storage.
fsbkt, err := filesystem.NewBucket(subDir)
if err != nil {
level.Warn(jobLogger).Log("msg", "failed to create filesystem bucket, skipping sparse header upload", "err", err)
return
}
_ = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]
err := prepareSparseIndexHeader(ctx, jobLogger, fsbkt, subDir, blockToUpload.ulid, c.sparseIndexHeaderSamplingRate, c.sparseIndexHeaderconfig)
if err != nil {
level.Warn(jobLogger).Log("msg", "failed to create sparse index headers", "block", blockToUpload.ulid.String(), "shard", blockToUpload.shardIndex, "err", err)
}
return nil
})
}

// upload all blocks
err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]
uploadedBlocks.Inc()
bdir := filepath.Join(subDir, blockToUpload.ulid.String())
begin := time.Now()
if err := block.Upload(ctx, jobLogger, c.bkt, bdir, nil); err != nil {
return errors.Wrapf(err, "upload of %s failed", blockToUpload.ulid)
}

elapsed := time.Since(begin)
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels))
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(blockToUpload.labels))
return nil
})
if err != nil {
Expand All @@ -457,10 +489,19 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket")
}
}

return true, compIDs, nil
}

func prepareSparseIndexHeader(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string, id ulid.ULID, sampling int, cfg indexheader.Config) error {
// Calling NewStreamBinaryReader reads a block's index and writes a sparse-index-header to disk.
mets := indexheader.NewStreamBinaryReaderMetrics(nil)
br, err := indexheader.NewStreamBinaryReader(ctx, logger, bkt, dir, id, sampling, mets, cfg)
if err != nil {
return err
}
return br.Close()
}

// verifyCompactedBlocksTimeRanges does a full run over the compacted blocks
// and verifies that they satisfy the min/maxTime from the source blocks
func verifyCompactedBlocksTimeRanges(compIDs []ulid.ULID, sourceBlocksMinTime, sourceBlocksMaxTime int64, subDir string) error {
Expand Down Expand Up @@ -530,6 +571,7 @@ func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob
type ulidWithShardIndex struct {
ulid ulid.ULID
shardIndex int
labels map[string]string
}

// issue347Error is a type wrapper for errors that should invoke the repair process for broken block.
Expand Down Expand Up @@ -747,20 +789,23 @@ var ownAllJobs = func(*Job) (bool, error) {

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
sy *metaSyncer
grouper Grouper
comp Compactor
planner Planner
compactDir string
bkt objstore.Bucket
concurrency int
skipUnhealthyBlocks bool
ownJob ownCompactionJobFunc
sortJobs JobsOrderFunc
waitPeriod time.Duration
blockSyncConcurrency int
metrics *BucketCompactorMetrics
logger log.Logger
sy *metaSyncer
grouper Grouper
comp Compactor
planner Planner
compactDir string
bkt objstore.Bucket
concurrency int
skipUnhealthyBlocks bool
uploadSparseIndexHeaders bool
sparseIndexHeaderSamplingRate int
sparseIndexHeaderconfig indexheader.Config
ownJob ownCompactionJobFunc
sortJobs JobsOrderFunc
waitPeriod time.Duration
blockSyncConcurrency int
metrics *BucketCompactorMetrics
}

// NewBucketCompactor creates a new bucket compactor.
Expand All @@ -779,25 +824,31 @@ func NewBucketCompactor(
waitPeriod time.Duration,
blockSyncConcurrency int,
metrics *BucketCompactorMetrics,
uploadSparseIndexHeaders bool,
sparseIndexHeaderSamplingRate int,
sparseIndexHeaderconfig indexheader.Config,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
}
return &BucketCompactor{
logger: logger,
sy: sy,
grouper: grouper,
planner: planner,
comp: comp,
compactDir: compactDir,
bkt: bkt,
concurrency: concurrency,
skipUnhealthyBlocks: skipUnhealthyBlocks,
ownJob: ownJob,
sortJobs: sortJobs,
waitPeriod: waitPeriod,
blockSyncConcurrency: blockSyncConcurrency,
metrics: metrics,
logger: logger,
sy: sy,
grouper: grouper,
planner: planner,
comp: comp,
compactDir: compactDir,
bkt: bkt,
concurrency: concurrency,
skipUnhealthyBlocks: skipUnhealthyBlocks,
ownJob: ownJob,
sortJobs: sortJobs,
waitPeriod: waitPeriod,
blockSyncConcurrency: blockSyncConcurrency,
metrics: metrics,
uploadSparseIndexHeaders: uploadSparseIndexHeaders,
sparseIndexHeaderSamplingRate: sparseIndexHeaderSamplingRate,
sparseIndexHeaderconfig: sparseIndexHeaderconfig,
}, nil
}

Expand Down
21 changes: 20 additions & 1 deletion pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/thanos-io/objstore/providers/filesystem"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
util_log "github.com/grafana/mimir/pkg/util/log"
)
Expand Down Expand Up @@ -240,7 +241,10 @@ func TestGroupCompactE2E(t *testing.T) {
planner := NewSplitAndMergePlanner([]int64{1000, 3000})
grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, logger)
metrics := NewBucketCompactorMetrics(blocksMarkedForDeletion, prometheus.NewPedanticRegistry())
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics)
cfg := indexheader.Config{VerifyOnLoad: true}
bComp, err := NewBucketCompactor(
logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics, true, 32, cfg,
)
require.NoError(t, err)

// Compaction on empty should not fail.
Expand Down Expand Up @@ -374,6 +378,21 @@ func TestGroupCompactE2E(t *testing.T) {
return nil
}))

// expect the blocks that are compacted to have sparse-index-headers in object storage.
require.NoError(t, bkt.Iter(ctx, "", func(n string) error {
id, ok := block.IsBlockDir(n)
if !ok {
return nil
}

if _, ok := others[id.String()]; ok {
p := path.Join(id.String(), block.SparseIndexHeaderFilename)
exists, _ := bkt.Exists(ctx, p)
assert.True(t, exists, "expected sparse index headers not found %s", p)
}
return nil
}))

for id, found := range nonCompactedExpected {
assert.True(t, found, "not found expected block %s", id.String())
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/compactor/bucket_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"

"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/util/extprom"
)
Expand Down Expand Up @@ -118,9 +119,10 @@ func TestBucketCompactor_FilterOwnJobs(t *testing.T) {
}

m := NewBucketCompactorMetrics(promauto.With(nil).NewCounter(prometheus.CounterOpts{}), nil)
cfg := indexheader.Config{VerifyOnLoad: true}
for testName, testCase := range tests {
t.Run(testName, func(t *testing.T) {
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, testCase.ownJob, nil, 0, 4, m)
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, testCase.ownJob, nil, 0, 4, m, false, 32, cfg)
require.NoError(t, err)

res, err := bc.filterOwnJobs(jobsFn())
Expand Down Expand Up @@ -155,8 +157,9 @@ func TestBlockMaxTimeDeltas(t *testing.T) {
}))

metrics := NewBucketCompactorMetrics(promauto.With(nil).NewCounter(prometheus.CounterOpts{}), nil)
cfg := indexheader.Config{VerifyOnLoad: true}
now := time.UnixMilli(1500002900159)
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, nil, nil, 0, 4, metrics)
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, nil, nil, 0, 4, metrics, true, 32, cfg)
require.NoError(t, err)

deltas := bc.blockMaxTimeDeltas(now, []*Job{j1, j2})
Expand Down
Loading

0 comments on commit 769906a

Please sign in to comment.