Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: Upload Sparse Index Headers to Object Storage #10684

Merged
merged 49 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c814a03
mv indexheader into utils
dmwilson-grafana Feb 14, 2025
df725dc
fmt + benchmarks for BinaryWrite
dmwilson-grafana Feb 18, 2025
b3de32d
update benchmarks
dmwilson-grafana Feb 18, 2025
4c63eb9
update tests on compactor e2e
dmwilson-grafana Feb 18, 2025
7329292
update tests on compactor e2e
dmwilson-grafana Feb 18, 2025
6ce1d79
fix err handling in WriteBinary
dmwilson-grafana Feb 18, 2025
cc8b313
mv indexheader to pkg/storage
dmwilson-grafana Feb 19, 2025
d771282
rm change to BinaryWrite
dmwilson-grafana Feb 19, 2025
753572d
rm unused import related to BinaryWrite
dmwilson-grafana Feb 19, 2025
ccb4912
pass uploadSparseIndexHeaders through Config + update docs
dmwilson-grafana Feb 20, 2025
c432e3a
update docs
dmwilson-grafana Feb 20, 2025
fa47f62
docs
dmwilson-grafana Feb 20, 2025
d8de8fa
docs
dmwilson-grafana Feb 20, 2025
00f9b41
handrail comments; rm TODO
dmwilson-grafana Feb 20, 2025
ef039aa
comments
dmwilson-grafana Feb 21, 2025
24aad5f
Merge branch 'main' into dwilson/upload-sparse-headers-from-compactor
dmwilson-grafana Feb 21, 2025
fe8a12e
add handling for configured sampling rate != sparse-index-header samp…
dmwilson-grafana Feb 22, 2025
fd2d9fd
add comments on DownsamplePostings
dmwilson-grafana Feb 22, 2025
1ea4e57
updates to downsampling
dmwilson-grafana Feb 24, 2025
c362589
golangci-lint
dmwilson-grafana Feb 24, 2025
d766041
add todo comment on test, can pass unexpectedly
dmwilson-grafana Feb 24, 2025
13753ff
update to tests
dmwilson-grafana Feb 25, 2025
41d561b
review comments
dmwilson-grafana Feb 26, 2025
fb28d6e
address review comments
dmwilson-grafana Feb 26, 2025
35a4c1b
golint
dmwilson-grafana Feb 26, 2025
8ad719f
pass config through init functions
dmwilson-grafana Feb 27, 2025
1d7d402
update downsampling in NewPostingOffsetTableFromSparseHeader to alway…
dmwilson-grafana Feb 27, 2025
7905de5
fix postings to pass TestStreamBinaryReader_CheckSparseHeadersCorrect…
dmwilson-grafana Feb 27, 2025
cd77083
fix postings to pass TestStreamBinaryReader_CheckSparseHeadersCorrect…
dmwilson-grafana Feb 27, 2025
62b3c6f
stat sparse index headers before block upload; no warning on failed u…
dmwilson-grafana Feb 27, 2025
75d604b
posting sampling tests
dmwilson-grafana Feb 27, 2025
3663d67
update header sampling tests
dmwilson-grafana Feb 27, 2025
be12809
split runCompactionJob upload into multiple concurrency.ForEachJob
dmwilson-grafana Feb 27, 2025
f4a8034
update changelog.md
dmwilson-grafana Feb 28, 2025
5af108b
Merge branch 'main' into dwilson/upload-sparse-headers-from-compactor
dmwilson-grafana Feb 28, 2025
0977f15
golint
dmwilson-grafana Feb 28, 2025
92b4610
Update CHANGELOG.md
dimitarvdimitrov Feb 28, 2025
d2439ac
Update CHANGELOG.md
dimitarvdimitrov Feb 28, 2025
7949eaf
Revert "Update CHANGELOG.md"
dimitarvdimitrov Feb 28, 2025
b69e920
Update pkg/compactor/bucket_compactor.go
dmwilson-grafana Feb 28, 2025
ce90588
add struct fields on test
dmwilson-grafana Feb 28, 2025
19a1b49
rework downsampling tests; require first and last
dmwilson-grafana Feb 28, 2025
3426a41
add check for first and last table offsets to CheckSparseHeadersCorre…
dmwilson-grafana Mar 3, 2025
c90a8cd
fix conflicts changelog.md
dmwilson-grafana Mar 3, 2025
f2e8cbd
check all ranges in index are in header
dmwilson-grafana Mar 4, 2025
4ee0a03
Merge branch 'main' into dwilson/upload-sparse-headers-from-compactor
dmwilson-grafana Mar 4, 2025
c9ead14
comment on offset adjust
dmwilson-grafana Mar 4, 2025
0f98926
Update docs/sources/mimir/configure/configuration-parameters/index.md
dmwilson-grafana Mar 5, 2025
a604399
update docs
dmwilson-grafana Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702
* [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [CHANGE] Querier: pass context to queryable `IsApplicable` hook. #10451
* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
Expand All @@ -16,8 +17,8 @@
* [CHANGE] Ingester: Set `-ingester.ooo-native-histograms-ingestion-enabled` to true by default. #10483
* [CHANGE] Ruler: Add `user` and `reason` labels to `cortex_ruler_write_requests_failed_total` and `cortex_ruler_queries_failed_total`; add `user` to
`cortex_ruler_write_requests_total` and `cortex_ruler_queries_total` metrics. #10536
* [CHANGE] Querier / Query-frontend: Remove experimental `-querier.promql-experimental-functions-enabled` and `-query-frontend.block-promql-experimental-functions` CLI flags and respective YAML configuration options to enable experimental PromQL functions. Instead access to experimental PromQL functions is always blocked. You can enable them using the per-tenant setting `enabled_promql_experimental_functions`. #10660 #10712
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [CHANGE] Querier / Query-frontend: Remove experimental `-querier.promql-experimental-functions-enabled` and `-query-frontend.block-promql-experimental-functions` CLI flags and respective YAML configuration options to enable experimental PromQL functions. Instead access to experimental PromQL functions is always blocked. You can enable them using the per-tenant setting `enabled_promql_experimental_functions`. #10660
* [CHANGE] Store-gateway: Include posting sampling rate in sparse index headers. When the sampling rate isn't set in a sparse index header, store gateway will rebuild the sparse header with the configured `blocks-storage.bucket-store.posting-offsets-in-mem-sampling` value. If the sparse header's sampling rate is set, but doesn't match the configured rate, store gateway will either rebuild the sparse header or downsample to the configured sampling rate. #10684
* [ENHANCEMENT] Compactor: Expose `cortex_bucket_index_last_successful_update_timestamp_seconds` for all tenants assigned to the compactor before starting the block cleanup job. #10569
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
Expand Down Expand Up @@ -58,6 +59,7 @@
* `go_cpu_classes_gc_total_cpu_seconds_total`
* `go_cpu_classes_total_cpu_seconds_total`
* `go_cpu_classes_idle_cpu_seconds_total`
* [ENCHANCEMENT] Compactor: Add experimental `-compactor.upload-sparse-index-headers` option. When enabled, the compactor will attempt to upload sparse index headers to object storage. This prevents latency spikes after adding store-gateway replicas. #10684
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11532,6 +11532,17 @@
"fieldFlag": "compactor.max-lookback",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "upload_sparse_index_headers",
"required": false,
"desc": "If enabled, the compactor will construct and upload sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "compactor.upload-sparse-index-headers",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,8 @@ Usage of ./cmd/mimir/mimir:
Number of symbols flushers used when doing split compaction. (default 1)
-compactor.tenant-cleanup-delay duration
For tenants marked for deletion, this is the time between deletion of the last block, and doing final cleanup (marker files, debug files) of the tenant. (default 6h0m0s)
-compactor.upload-sparse-index-headers
[experimental] If enabled, the compactor will construct and upload sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.
-config.expand-env
Expands ${var} or $var in config according to the values of the environment variables.
-config.file value
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ The following features are currently experimental:
- `-compactor.in-memory-tenant-meta-cache-size`
- Limit blocks processed in each compaction cycle. Blocks uploaded prior to the maximum lookback aren't processed.
- `-compactor.max-lookback`
- Enable the compactor to upload sparse index headers to object storage during compaction cycles.
- `-compactor.upload-sparse-index-headers`
- Ruler
- Aligning of evaluation timestamp on interval (`align_evaluation_time_on_interval`)
- Allow defining limits on the maximum number of rules allowed in a rule group by namespace and the maximum number of rule groups by namespace. If set, this supersedes the `-ruler.max-rules-per-rule-group` and `-ruler.max-rule-groups-per-tenant` limits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4960,6 +4960,13 @@ sharding_ring:
# blocks are considered regardless of their upload time.
# CLI flag: -compactor.max-lookback
[max_lookback: <duration> | default = 0s]

# (experimental) If enabled, the compactor will construct and upload sparse
# index headers to object storage during each compaction cycle. This allows
# store-gateway instances to use the sparse headers from object storage instead
# of recreating them locally.
# CLI flag: -compactor.upload-sparse-index-headers
[upload_sparse_index_headers: <boolean> | default = false]
```

### store_gateway
Expand Down
120 changes: 86 additions & 34 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -394,25 +396,26 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger)

// update labels and verify all blocks
err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]

uploadedBlocks.Inc()

bdir := filepath.Join(subDir, blockToUpload.ulid.String())

// When splitting is enabled, we need to inject the shard ID as an external label.
newLabels := job.Labels().Map()
if job.UseSplitting() {
newLabels[mimir_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))
}
blocksToUpload[idx].labels = newLabels

newMeta, err := block.InjectThanosMeta(jobLogger, bdir, block.ThanosMeta{
Labels: newLabels,
Downsample: block.ThanosDownsample{Resolution: job.Resolution()},
Source: block.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)

if err != nil {
return errors.Wrapf(err, "failed to finalize the block %s", bdir)
}
Expand All @@ -421,18 +424,47 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return errors.Wrap(err, "remove tombstones")
}

// Ensure the compacted block is valid.
if err := block.VerifyBlock(ctx, jobLogger, bdir, newMeta.MinTime, newMeta.MaxTime, false); err != nil {
return errors.Wrapf(err, "invalid result block %s", bdir)
}
return nil
})
if err != nil {
return false, nil, err
}

// Optionally build sparse-index-headers. Building sparse-index-headers is best effort, we do not skip uploading a
// compacted block if there's an error affecting sparse-index-headers.
if c.uploadSparseIndexHeaders {
// Create a bucket backed by the local compaction directory, allows calls to prepareSparseIndexHeader to
// construct sparse-index-headers without making requests to object storage.
fsbkt, err := filesystem.NewBucket(subDir)
if err != nil {
level.Warn(jobLogger).Log("msg", "failed to create filesystem bucket, skipping sparse header upload", "err", err)
return
}
_ = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]
err := prepareSparseIndexHeader(ctx, jobLogger, fsbkt, subDir, blockToUpload.ulid, c.sparseIndexHeaderSamplingRate, c.sparseIndexHeaderconfig)
if err != nil {
level.Warn(jobLogger).Log("msg", "failed to create sparse index headers", "block", blockToUpload.ulid.String(), "shard", blockToUpload.shardIndex, "err", err)
}
return nil
})
}

// upload all blocks
err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]
uploadedBlocks.Inc()
bdir := filepath.Join(subDir, blockToUpload.ulid.String())
begin := time.Now()
if err := block.Upload(ctx, jobLogger, c.bkt, bdir, nil); err != nil {
return errors.Wrapf(err, "upload of %s failed", blockToUpload.ulid)
}

elapsed := time.Since(begin)
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels))
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(blockToUpload.labels))
return nil
})
if err != nil {
Expand All @@ -457,10 +489,20 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return false, nil, errors.Wrapf(err, "mark old block for deletion from bucket")
}
}

return true, compIDs, nil
}

func prepareSparseIndexHeader(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string, id ulid.ULID, sampling int, cfg indexheader.Config) error {
// Calling NewStreamBinaryReader reads a block's index and writes a sparse-index-header to disk.
mets := indexheader.NewStreamBinaryReaderMetrics(nil)
br, err := indexheader.NewStreamBinaryReader(ctx, logger, bkt, dir, id, sampling, mets, cfg)
if err != nil {
return err
}
br.Close()
return nil
}

// verifyCompactedBlocksTimeRanges does a full run over the compacted blocks
// and verifies that they satisfy the min/maxTime from the source blocks
func verifyCompactedBlocksTimeRanges(compIDs []ulid.ULID, sourceBlocksMinTime, sourceBlocksMaxTime int64, subDir string) error {
Expand Down Expand Up @@ -530,6 +572,7 @@ func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob
type ulidWithShardIndex struct {
ulid ulid.ULID
shardIndex int
labels map[string]string
}

// issue347Error is a type wrapper for errors that should invoke the repair process for broken block.
Expand Down Expand Up @@ -747,20 +790,23 @@ var ownAllJobs = func(*Job) (bool, error) {

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
sy *metaSyncer
grouper Grouper
comp Compactor
planner Planner
compactDir string
bkt objstore.Bucket
concurrency int
skipUnhealthyBlocks bool
ownJob ownCompactionJobFunc
sortJobs JobsOrderFunc
waitPeriod time.Duration
blockSyncConcurrency int
metrics *BucketCompactorMetrics
logger log.Logger
sy *metaSyncer
grouper Grouper
comp Compactor
planner Planner
compactDir string
bkt objstore.Bucket
concurrency int
skipUnhealthyBlocks bool
uploadSparseIndexHeaders bool
sparseIndexHeaderSamplingRate int
sparseIndexHeaderconfig indexheader.Config
ownJob ownCompactionJobFunc
sortJobs JobsOrderFunc
waitPeriod time.Duration
blockSyncConcurrency int
metrics *BucketCompactorMetrics
}

// NewBucketCompactor creates a new bucket compactor.
Expand All @@ -779,25 +825,31 @@ func NewBucketCompactor(
waitPeriod time.Duration,
blockSyncConcurrency int,
metrics *BucketCompactorMetrics,
uploadSparseIndexHeaders bool,
sparseIndexHeaderSamplingRate int,
sparseIndexHeaderconfig indexheader.Config,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
}
return &BucketCompactor{
logger: logger,
sy: sy,
grouper: grouper,
planner: planner,
comp: comp,
compactDir: compactDir,
bkt: bkt,
concurrency: concurrency,
skipUnhealthyBlocks: skipUnhealthyBlocks,
ownJob: ownJob,
sortJobs: sortJobs,
waitPeriod: waitPeriod,
blockSyncConcurrency: blockSyncConcurrency,
metrics: metrics,
logger: logger,
sy: sy,
grouper: grouper,
planner: planner,
comp: comp,
compactDir: compactDir,
bkt: bkt,
concurrency: concurrency,
skipUnhealthyBlocks: skipUnhealthyBlocks,
ownJob: ownJob,
sortJobs: sortJobs,
waitPeriod: waitPeriod,
blockSyncConcurrency: blockSyncConcurrency,
metrics: metrics,
uploadSparseIndexHeaders: uploadSparseIndexHeaders,
sparseIndexHeaderSamplingRate: sparseIndexHeaderSamplingRate,
sparseIndexHeaderconfig: sparseIndexHeaderconfig,
}, nil
}

Expand Down
21 changes: 20 additions & 1 deletion pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/thanos-io/objstore/providers/filesystem"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
util_log "github.com/grafana/mimir/pkg/util/log"
)
Expand Down Expand Up @@ -240,7 +241,10 @@ func TestGroupCompactE2E(t *testing.T) {
planner := NewSplitAndMergePlanner([]int64{1000, 3000})
grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, logger)
metrics := NewBucketCompactorMetrics(blocksMarkedForDeletion, prometheus.NewPedanticRegistry())
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics)
cfg := indexheader.Config{VerifyOnLoad: true}
bComp, err := NewBucketCompactor(
logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 0, 4, metrics, true, 32, cfg,
)
require.NoError(t, err)

// Compaction on empty should not fail.
Expand Down Expand Up @@ -374,6 +378,21 @@ func TestGroupCompactE2E(t *testing.T) {
return nil
}))

// expect the blocks that are compacted to have sparse-index-headers in object storage.
require.NoError(t, bkt.Iter(ctx, "", func(n string) error {
id, ok := block.IsBlockDir(n)
if !ok {
return nil
}

if _, ok := others[id.String()]; ok {
p := path.Join(id.String(), block.SparseIndexHeaderFilename)
exists, _ := bkt.Exists(ctx, p)
assert.True(t, exists, "expected sparse index headers not found %s", p)
}
return nil
}))

for id, found := range nonCompactedExpected {
assert.True(t, found, "not found expected block %s", id.String())
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/compactor/bucket_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"

"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/util/extprom"
)
Expand Down Expand Up @@ -118,9 +119,10 @@ func TestBucketCompactor_FilterOwnJobs(t *testing.T) {
}

m := NewBucketCompactorMetrics(promauto.With(nil).NewCounter(prometheus.CounterOpts{}), nil)
cfg := indexheader.Config{VerifyOnLoad: true}
for testName, testCase := range tests {
t.Run(testName, func(t *testing.T) {
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, testCase.ownJob, nil, 0, 4, m)
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, testCase.ownJob, nil, 0, 4, m, false, 32, cfg)
require.NoError(t, err)

res, err := bc.filterOwnJobs(jobsFn())
Expand Down Expand Up @@ -155,8 +157,9 @@ func TestBlockMaxTimeDeltas(t *testing.T) {
}))

metrics := NewBucketCompactorMetrics(promauto.With(nil).NewCounter(prometheus.CounterOpts{}), nil)
cfg := indexheader.Config{VerifyOnLoad: true}
now := time.UnixMilli(1500002900159)
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, nil, nil, 0, 4, metrics)
bc, err := NewBucketCompactor(log.NewNopLogger(), nil, nil, nil, nil, "", nil, 2, false, nil, nil, 0, 4, metrics, true, 32, cfg)
require.NoError(t, err)

deltas := bc.blockMaxTimeDeltas(now, []*Job{j1, j2})
Expand Down
Loading