diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 949c3c33fac..a9ec7e42e22 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -4499,6 +4499,16 @@ "fieldFlag": "store.max-labels-query-length", "fieldType": "duration" }, + { + "kind": "field", + "name": "max_series_query_limit", + "required": false, + "desc": "Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "querier.max-series-query-limit", + "fieldType": "int" + }, { "kind": "field", "name": "max_cache_freshness", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 852cf94bebe..61969699231 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2167,6 +2167,8 @@ Usage of ./cmd/mimir/mimir: Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers. (default 14) -querier.max-samples int Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000) + -querier.max-series-query-limit int + Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable. -querier.mimir-query-engine.disabled-aggregations comma-separated-list-of-strings [experimental] Comma-separated list of aggregations to disable support for. Only applies if MQE is in use. -querier.mimir-query-engine.disabled-functions comma-separated-list-of-strings diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index 9beddf72ff8..7abc24ea50a 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -557,6 +557,8 @@ Usage of ./cmd/mimir/mimir: Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers. (default 14) -querier.max-samples int Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000) + -querier.max-series-query-limit int + Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable. -querier.scheduler-address string Address of the query-scheduler component, in host:port format. The host should resolve to all query-scheduler instances. This option should be set only when query-scheduler component is in use and -query-scheduler.service-discovery-mode is set to 'dns'. -querier.timeout duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index c469c7f27a1..8dfb44ba706 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3635,6 +3635,13 @@ The `limits` block configures default and per-tenant limits imposed by component # CLI flag: -store.max-labels-query-length [max_labels_query_length: | default = 0s] +# Maximum number of items, series queries. This limit is enforced in the +# querier. If the requested limit is outside the allowed value, the request will +# not fail but will be manipulated to only query data up to allowed limit. 0 to +# disable. +# CLI flag: -querier.max-series-query-limit +[max_series_query_limit: | default = 0] + # (advanced) Most recent allowed cacheable result per-tenant, to prevent caching # very recent results that might still be in flux. # CLI flag: -query-frontend.max-cache-freshness diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 3822b805712..4a6c2c0634a 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -6,6 +6,7 @@ package querier import ( + "cmp" "context" "errors" "flag" @@ -30,6 +31,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/chunk" "github.com/grafana/mimir/pkg/storage/lazyquery" + "github.com/grafana/mimir/pkg/storage/series" "github.com/grafana/mimir/pkg/streamingpromql" "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/util" @@ -318,7 +320,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context, matchers ...*labels.Matc // Select implements storage.Querier interface. // The bool passed is ignored because the series are always sorted. -func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) (set storage.SeriesSet) { spanLog, ctx := spanlogger.NewWithLogger(ctx, mq.logger, "querier.Select") defer spanLog.Span.Finish() @@ -356,21 +358,24 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin return storage.ErrSeriesSet(err) } - // Validate query time range. Even if the time range has already been validated when we created - // the querier, we need to check it again here because the time range specified in hints may be - // different. + // Validate query parameters. + // Even if the time range has already been validated when we created the querier, we need to check it again here + // because the time range specified in hints may be different. + limit := sp.Limit startMs, endMs, err := validateQueryTimeRange(userID, sp.Start, sp.End, now.UnixMilli(), mq.limits, spanLog) if errors.Is(err, errEmptyTimeRange) { return storage.NoopSeriesSet() } else if err != nil { return storage.ErrSeriesSet(err) } - if sp.Func == "series" { // Clamp max time range for series-only queries, before we check max length. + if sp.Func == "series" { + // Clamp max time range for series-only queries, before we check max length. startMs = clampToMaxLabelQueryLength(spanLog, startMs, endMs, now.UnixMilli(), mq.limits.MaxLabelsQueryLength(userID).Milliseconds()) + // Clamp the limit for series-only queries. + limit = clampToMaxSeriesQueryLimit(spanLog, limit, mq.limits.MaxSeriesQueryLimit(userID)) } - // The time range may have been manipulated during the validation, - // so we make sure changes are reflected back to hints. + // The query parameters may have been manipulated during the validation, so we make sure changes are reflected back to hints. sp.Start = startMs sp.End = endMs @@ -382,6 +387,16 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin return storage.ErrSeriesSet(NewMaxQueryLengthError(endTime.Sub(startTime), maxQueryLength)) } + // If the original request didn't have a limit but the limit was enforced, annotate the resulting series set with a warning. + if sp.Limit == 0 && sp.Limit != limit { + defer func() { + var warning annotations.Annotations + warning.Add(errors.New("results may be truncated due to limit")) + set = series.NewSeriesSetWithWarnings(set, warning) + }() + } + sp.Limit = limit + if len(queriers) == 1 { return queriers[0].Select(ctx, true, sp, matchers...) } @@ -450,6 +465,24 @@ func clampToMaxLabelQueryLength(spanLog *spanlogger.SpanLogger, startMs, endMs, return startMs } +func clampToMaxSeriesQueryLimit(spanLog *spanlogger.SpanLogger, limit, maxSeriesQueryLimit int) int { + if maxSeriesQueryLimit == 0 { + // No request limit is enforced. + return limit + } + + // Request limit is enforced. + newLimit := min(cmp.Or(limit, maxSeriesQueryLimit), maxSeriesQueryLimit) + if newLimit != limit { + spanLog.DebugLog( + "msg", "the request limit of the query has been manipulated because of the 'max-series-query-limit' setting", + "original", limit, + "updated", newLimit, + ) + } + return newLimit +} + // LabelValues implements storage.Querier. func (mq multiQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { ctx, queriers, err := mq.getQueriers(ctx, matchers...) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 97a3d28af06..41cfbd84c9d 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -7,6 +7,7 @@ package querier import ( "context" + stderrors "errors" "fmt" "testing" "time" @@ -1120,7 +1121,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) { delta := float64(5000) require.Len(t, distributor.Calls, 1) assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method) - assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method) gotStartMillis := int64(distributor.Calls[0].Arguments.Get(1).(model.Time)) assert.InDeltaf(t, util.TimeToMillis(testData.expectedMetadataStartTime), gotStartMillis, delta, "expected start %s, got %s", testData.expectedMetadataStartTime.UTC(), util.TimeFromMillis(gotStartMillis).UTC()) gotEndMillis := int64(distributor.Calls[0].Arguments.Get(2).(model.Time)) @@ -1129,6 +1129,119 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) { } } +func TestQuerier_ValidateQuery_MaxSeriesQueryLimit(t *testing.T) { + const thirtyDays = 30 * 24 * time.Hour + + now := time.Now() + + tests := map[string]struct { + maxSeriesQueryLimit int + query string + queryStartTime time.Time + queryEndTime time.Time + queryLimit int + expectedLimit int + expectedWarning error + }{ + "should not manipulate limit for a query when limit is not enforced": { + maxSeriesQueryLimit: 0, + query: "rate(foo[29d])", + queryStartTime: now.Add(-time.Hour), + queryEndTime: now, + queryLimit: 1000, + expectedLimit: 1000, + expectedWarning: nil, + }, + "should not manipulate limit for a query without a limit when not enforced": { + maxSeriesQueryLimit: 0, + query: "rate(foo[29d])", + queryStartTime: now.Add(-time.Hour), + queryEndTime: now, + queryLimit: 0, + expectedLimit: 0, + expectedWarning: nil, + }, + "should manipulate limit for a query when enforced": { + maxSeriesQueryLimit: 1000, + query: "rate(foo[29d])", + queryStartTime: now.Add(-time.Hour), + queryEndTime: now, + queryLimit: 1_000_000, + expectedLimit: 1000, + expectedWarning: nil, + }, + "should manipulate limit for a query without a limit when enforced": { + maxSeriesQueryLimit: 1000, + query: "rate(foo[29d])", + queryStartTime: now.Add(-time.Hour), + queryEndTime: now, + queryLimit: 0, + expectedLimit: 1000, + expectedWarning: errors.New("results may be truncated due to limit"), + }, + "should not manipulate limit for a query with limit smaller than what is enforced": { + maxSeriesQueryLimit: 1000, + query: "rate(foo[29d])", + queryStartTime: now.Add(-time.Hour), + queryEndTime: now, + queryLimit: 100, + expectedLimit: 100, + expectedWarning: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + + var cfg Config + flagext.DefaultValues(&cfg) + + limits := defaultLimitsConfig() + limits.MaxQueryLookback = model.Duration(thirtyDays * 2) + limits.MaxSeriesQueryLimit = testData.maxSeriesQueryLimit + limits.QueryIngestersWithin = 0 // Always query ingesters in this test. + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + distributor := &mockDistributor{} + distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) + + queryable, _, _, err := New(cfg, overrides, distributor, nil, nil, log.NewNopLogger(), nil) + require.NoError(t, err) + + q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) + require.NoError(t, err) + + hints := &storage.SelectHints{ + Start: util.TimeToMillis(testData.queryStartTime), + End: util.TimeToMillis(testData.queryEndTime), + Limit: testData.queryLimit, + Func: "series", + } + matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test") + + set := q.Select(ctx, false, hints, matcher) + require.False(t, set.Next()) // Expected to be empty. + require.NoError(t, set.Err()) + + // Assert on the warning about enforced limit. + gotWarnings := set.Warnings().AsErrors() + if testData.expectedWarning != nil { + require.EqualError(t, stderrors.Join(gotWarnings...), testData.expectedWarning.Error()) + } else { + require.Empty(t, gotWarnings) + } + + // Assert on the limit of the actual executed query. + require.Len(t, distributor.Calls, 1) + assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method) + gotHints := distributor.Calls[0].Arguments.Get(3).(*storage.SelectHints) + require.Equal(t, testData.expectedLimit, gotHints.Limit) + }) + } +} + func testRangeQuery(t testing.TB, queryable storage.Queryable, end model.Time, q query) *promql.Result { dir := t.TempDir() queryTracker := promql.NewActiveQueryTracker(dir, 10, promslog.NewNopLogger()) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index ac83945718d..336cd919444 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -171,6 +171,7 @@ type Limits struct { MaxPartialQueryLength model.Duration `yaml:"max_partial_query_length" json:"max_partial_query_length"` MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` MaxLabelsQueryLength model.Duration `yaml:"max_labels_query_length" json:"max_labels_query_length"` + MaxSeriesQueryLimit int `yaml:"max_series_query_limit" json:"max_series_query_limit"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness" category:"advanced"` MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` QueryShardingTotalShards int `yaml:"query_sharding_total_shards" json:"query_sharding_total_shards"` @@ -334,6 +335,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler for instant, range and remote read queries. For metadata queries like series, label names, label values queries the limit is enforced in the querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers.") f.Var(&l.MaxLabelsQueryLength, "store.max-labels-query-length", "Limit the time range (end - start time) of series, label names and values queries. This limit is enforced in the querier. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") + f.IntVar(&l.MaxSeriesQueryLimit, "querier.max-series-query-limit", 0, "Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.") f.IntVar(&l.LabelNamesAndValuesResultsMaxSizeBytes, "querier.label-names-and-values-results-max-size-bytes", 400*1024*1024, "Maximum size in bytes of distinct label names and values. When querier receives response from ingester, it merges the response with responses from other ingesters. This maximum size limit is applied to the merged(distinct) results. If the limit is reached, an error is returned.") f.IntVar(&l.ActiveSeriesResultsMaxSizeBytes, "querier.active-series-results-max-size-bytes", 400*1024*1024, "Maximum size of an active series or active native histogram series request result shard in bytes. 0 to disable.") f.BoolVar(&l.CardinalityAnalysisEnabled, "querier.cardinality-analysis-enabled", false, "Enables endpoints used for cardinality analysis.") @@ -755,6 +757,11 @@ func (o *Overrides) MaxLabelsQueryLength(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxLabelsQueryLength) } +// MaxSeriesQueryLimit returns the query limit of a series request. +func (o *Overrides) MaxSeriesQueryLimit(userID string) int { + return o.getOverridesForUser(userID).MaxSeriesQueryLimit +} + // MaxCacheFreshness returns the period after which results are cacheable, // to prevent caching of very recent results. func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {