Skip to content

Commit

Permalink
Compactor: make -compactor.max-lookback a per-tenant option (#10794)
Browse files Browse the repository at this point in the history
* resolve conflicts

* update comment
  • Loading branch information
dmwilson-grafana authored Mar 7, 2025
1 parent 769906a commit c7bd8bd
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 23 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* [ENHANCEMENT] Querier, ingester: The series API respects passed `limit` parameter. #10620 #10652
* [ENHANCEMENT] Store-gateway: Add experimental settings under `-store-gateway.dynamic-replication` to allow more than the default of 3 store-gateways to own recent blocks. #10382 #10637
* [ENHANCEMENT] Ingester: Add reactive concurrency limiters to protect push and read operations from overload. #10574
* [ENHANCEMENT] Compactor: Add experimental `-compactor.max-lookback` option to limit blocks considered in each compaction cycle. Blocks uploaded prior to the lookback period aren't processed. This option helps reduce CPU utilization in tenants with large block metadata files that are processed before each compaction. #10585
* [ENHANCEMENT] Compactor: Add experimental `-compactor.max-lookback` option to limit blocks considered in each compaction cycle. Blocks uploaded prior to the lookback period aren't processed. This option helps reduce CPU utilization in tenants with large block metadata files that are processed before each compaction. #10585 #10794
* [ENHANCEMENT] Distributor: Optionally expose the current HA replica for each tenant in the `cortex_ha_tracker_elected_replica_status` metric. This is enabled with the `-distributor.ha-tracker.enable-elected-replica-metric=true` flag. #10644
* [ENHANCEMENT] Enable three Go runtime metrics: #10641
* `go_cpu_classes_gc_total_cpu_seconds_total`
Expand Down
22 changes: 11 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5071,6 +5071,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "compactor_max_lookback",
"required": false,
"desc": "Blocks uploaded before the lookback aren't considered in compactor cycles. If set, this value should be larger than all values in `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all blocks are considered regardless of their upload time.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "compactor.max-lookback",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "s3_sse_type",
Expand Down Expand Up @@ -11608,17 +11619,6 @@
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_lookback",
"required": false,
"desc": "Blocks uploaded before the lookback aren't considered in compactor cycles. If set, this value should be larger than all values in `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all blocks are considered regardless of their upload time.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "compactor.max-lookback",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "upload_sparse_index_headers",
Expand Down
14 changes: 7 additions & 7 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3932,6 +3932,13 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -compactor.block-upload-max-block-size-bytes
[compactor_block_upload_max_block_size_bytes: <int> | default = 0]

# (experimental) Blocks uploaded before the lookback aren't considered in
# compactor cycles. If set, this value should be larger than all values in
# `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all
# blocks are considered regardless of their upload time.
# CLI flag: -compactor.max-lookback
[compactor_max_lookback: <duration> | default = 0s]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down Expand Up @@ -4965,13 +4972,6 @@ sharding_ring:
# CLI flag: -compactor.compaction-jobs-order
[compaction_jobs_order: <string> | default = "smallest-range-oldest-blocks-first"]
# (experimental) Blocks uploaded before the lookback aren't considered in
# compactor cycles. If set, this value should be larger than all values in
# `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all
# blocks are considered regardless of their upload time.
# CLI flag: -compactor.max-lookback
[max_lookback: <duration> | default = 0s]

# (experimental) If enabled, the compactor constructs and uploads sparse index
# headers to object storage during each compaction cycle. This allows
# store-gateway instances to use the sparse headers from object storage instead
Expand Down
9 changes: 9 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,7 @@ type mockConfigProvider struct {
userPartialBlockDelayInvalid map[string]bool
verifyChunks map[string]bool
perTenantInMemoryCache map[string]int
maxLookback map[string]time.Duration
}

func newMockConfigProvider() *mockConfigProvider {
Expand All @@ -1558,6 +1559,7 @@ func newMockConfigProvider() *mockConfigProvider {
userPartialBlockDelayInvalid: make(map[string]bool),
verifyChunks: make(map[string]bool),
perTenantInMemoryCache: make(map[string]int),
maxLookback: make(map[string]time.Duration),
}
}

Expand Down Expand Up @@ -1625,6 +1627,13 @@ func (m *mockConfigProvider) S3SSEKMSEncryptionContext(string) string {
return ""
}

func (m *mockConfigProvider) CompactorMaxLookback(user string) time.Duration {
if result, ok := m.maxLookback[user]; ok {
return result
}
return 0
}

func (c *BlocksCleaner) runCleanupWithErr(ctx context.Context) error {
allUsers, isDeleted, err := c.refreshOwnedUsers(ctx)
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ type Config struct {
// Compactors sharding.
ShardingRing RingConfig `yaml:"sharding_ring"`

CompactionJobsOrder string `yaml:"compaction_jobs_order" category:"advanced"`
MaxLookback time.Duration `yaml:"max_lookback" category:"experimental"`
CompactionJobsOrder string `yaml:"compaction_jobs_order" category:"advanced"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
Expand Down Expand Up @@ -163,7 +162,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is the time between deletion of the last block, and doing final cleanup (marker files, debug files) of the tenant.")
f.BoolVar(&cfg.NoBlocksFileCleanupEnabled, "compactor.no-blocks-file-cleanup-enabled", false, "If enabled, will delete the bucket-index, markers and debug files in the tenant bucket when there are no blocks left in the index.")
f.DurationVar(&cfg.MaxLookback, "compactor.max-lookback", 0*time.Second, "Blocks uploaded before the lookback aren't considered in compactor cycles. If set, this value should be larger than all values in `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all blocks are considered regardless of their upload time.")
f.BoolVar(&cfg.UploadSparseIndexHeaders, "compactor.upload-sparse-index-headers", false, "If enabled, the compactor constructs and uploads sparse index headers to object storage during each compaction cycle. This allows store-gateway instances to use the sparse headers from object storage instead of recreating them locally.")

// compactor concurrency options
Expand Down Expand Up @@ -248,6 +246,10 @@ type ConfigProvider interface {

// CompactorInMemoryTenantMetaCacheSize returns number of parsed *Meta objects that we can keep in memory for the user between compactions.
CompactorInMemoryTenantMetaCacheSize(userID string) int

// CompactorMaxLookback returns the duration of the compactor lookback period, blocks uploaded before the lookback period aren't
// considered in compactor cycles
CompactorMaxLookback(userID string) time.Duration
}

// MultitenantCompactor is a multi-tenant TSDB block compactor based on Thanos.
Expand Down Expand Up @@ -795,7 +797,7 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e

// Disable maxLookback (set to 0s) when block upload is enabled, block upload enabled implies there will be blocks
// beyond the lookback period, we don't want the compactor to skip these
var maxLookback = c.compactorCfg.MaxLookback
var maxLookback = c.cfgProvider.CompactorMaxLookback(userID)
if c.cfgProvider.CompactorBlockUploadEnabled(userID) {
maxLookback = 0
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ type Limits struct {
CompactorBlockUploadVerifyChunks bool `yaml:"compactor_block_upload_verify_chunks" json:"compactor_block_upload_verify_chunks"`
CompactorBlockUploadMaxBlockSizeBytes int64 `yaml:"compactor_block_upload_max_block_size_bytes" json:"compactor_block_upload_max_block_size_bytes" category:"advanced"`
CompactorInMemoryTenantMetaCacheSize int `yaml:"compactor_in_memory_tenant_meta_cache_size" json:"compactor_in_memory_tenant_meta_cache_size" category:"experimental" doc:"hidden"`
CompactorMaxLookback model.Duration `yaml:"compactor_max_lookback" json:"compactor_max_lookback" category:"experimental"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -381,6 +382,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.CompactorBlockUploadVerifyChunks, "compactor.block-upload-verify-chunks", true, "Verify chunks when uploading blocks via the upload API for the tenant.")
f.Int64Var(&l.CompactorBlockUploadMaxBlockSizeBytes, "compactor.block-upload-max-block-size-bytes", 0, "Maximum size in bytes of a block that is allowed to be uploaded or validated. 0 = no limit.")
f.IntVar(&l.CompactorInMemoryTenantMetaCacheSize, "compactor.in-memory-tenant-meta-cache-size", 0, "Size of per-tenant in-memory cache for parsed meta.json files. This is useful when meta.json files are big and parsing is expensive. Small meta.json files are not cached. 0 means this cache is disabled.")
f.Var(&l.CompactorMaxLookback, "compactor.max-lookback", "Blocks uploaded before the lookback aren't considered in compactor cycles. If set, this value should be larger than all values in `-blocks-storage.tsdb.block-ranges-period`. A value of 0s means that all blocks are considered regardless of their upload time.")

// Query-frontend.
f.Var(&l.MaxTotalQueryLength, MaxTotalQueryLengthFlag, "Limit the total query time range (end - start time). This limit is enforced in the query-frontend on the received instant, range or remote read query.")
Expand Down Expand Up @@ -908,6 +910,12 @@ func (o *Overrides) EvaluationDelay(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).RulerEvaluationDelay)
}

// CompactorMaxLookback returns the duration of the compactor lookback period, blocks uploaded before the lookback period aren't
// considered in compactor cycles
func (o *Overrides) CompactorMaxLookback(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).CompactorMaxLookback)
}

// CompactorBlocksRetentionPeriod returns the retention period for a given user.
func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).CompactorBlocksRetentionPeriod)
Expand Down

0 comments on commit c7bd8bd

Please sign in to comment.