Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querier: allow to enforce series query request limit #10748

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4499,6 +4499,16 @@
"fieldFlag": "store.max-labels-query-length",
"fieldType": "duration"
},
{
"kind": "field",
"name": "max_series_query_limit",
"required": false,
"desc": "Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "querier.max-series-query-limit",
"fieldType": "int"
},
{
"kind": "field",
"name": "max_cache_freshness",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers. (default 14)
-querier.max-samples int
Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000)
-querier.max-series-query-limit int
Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.
-querier.mimir-query-engine.disabled-aggregations comma-separated-list-of-strings
[experimental] Comma-separated list of aggregations to disable support for. Only applies if MQE is in use.
-querier.mimir-query-engine.disabled-functions comma-separated-list-of-strings
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers. (default 14)
-querier.max-samples int
Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000)
-querier.max-series-query-limit int
Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.
-querier.scheduler-address string
Address of the query-scheduler component, in host:port format. The host should resolve to all query-scheduler instances. This option should be set only when query-scheduler component is in use and -query-scheduler.service-discovery-mode is set to 'dns'.
-querier.timeout duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3635,6 +3635,13 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -store.max-labels-query-length
[max_labels_query_length: <duration> | default = 0s]

# Maximum number of items, series queries. This limit is enforced in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the phrasing here. Is this the maximum number of series queries permitted? If so, we can remove the items and just say Maximum number of series queries.

# querier. If the requested limit is outside the allowed value, the request will
# not fail but will be manipulated to only query data up to allowed limit. 0 to
# disable.
Comment on lines +3639 to +3641
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# querier. If the requested limit is outside the allowed value, the request will
# not fail but will be manipulated to only query data up to allowed limit. 0 to
# disable.
# querier. If the requested limit is outside of the allowed value, the request doesn't
# fail, but is manipulated to only query data up to the allowed limit. Set to `0` to
# disable.

# CLI flag: -querier.max-series-query-limit
[max_series_query_limit: <int> | default = 0]

# (advanced) Most recent allowed cacheable result per-tenant, to prevent caching
# very recent results that might still be in flux.
# CLI flag: -query-frontend.max-cache-freshness
Expand Down
47 changes: 40 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package querier

import (
"cmp"
"context"
"errors"
"flag"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/chunk"
"github.com/grafana/mimir/pkg/storage/lazyquery"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/streamingpromql"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/util"
Expand Down Expand Up @@ -318,7 +320,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context, matchers ...*labels.Matc

// Select implements storage.Querier interface.
// The bool passed is ignored because the series are always sorted.
func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) (set storage.SeriesSet) {
spanLog, ctx := spanlogger.NewWithLogger(ctx, mq.logger, "querier.Select")
defer spanLog.Span.Finish()

Expand Down Expand Up @@ -356,21 +358,24 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin
return storage.ErrSeriesSet(err)
}

// Validate query time range. Even if the time range has already been validated when we created
// the querier, we need to check it again here because the time range specified in hints may be
// different.
// Validate query parameters.
// Even if the time range has already been validated when we created the querier, we need to check it again here
// because the time range specified in hints may be different.
limit := sp.Limit
startMs, endMs, err := validateQueryTimeRange(userID, sp.Start, sp.End, now.UnixMilli(), mq.limits, spanLog)
if errors.Is(err, errEmptyTimeRange) {
return storage.NoopSeriesSet()
} else if err != nil {
return storage.ErrSeriesSet(err)
}
if sp.Func == "series" { // Clamp max time range for series-only queries, before we check max length.
if sp.Func == "series" {
// Clamp max time range for series-only queries, before we check max length.
startMs = clampToMaxLabelQueryLength(spanLog, startMs, endMs, now.UnixMilli(), mq.limits.MaxLabelsQueryLength(userID).Milliseconds())
// Clamp the limit for series-only queries.
limit = clampToMaxSeriesQueryLimit(spanLog, limit, mq.limits.MaxSeriesQueryLimit(userID))
}

// The time range may have been manipulated during the validation,
// so we make sure changes are reflected back to hints.
// The query parameters may have been manipulated during the validation, so we make sure changes are reflected back to hints.
sp.Start = startMs
sp.End = endMs

Expand All @@ -382,6 +387,16 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin
return storage.ErrSeriesSet(NewMaxQueryLengthError(endTime.Sub(startTime), maxQueryLength))
}

// If the original request didn't have a limit but the limit was enforced, annotate the resulting series set with a warning.
if sp.Limit == 0 && sp.Limit != limit {
defer func() {
var warning annotations.Annotations
warning.Add(errors.New("results may be truncated due to limit"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add more details - what is the limit and what was the requested limit

Copy link
Contributor Author

@narqo narqo Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only adds a warning when the request didn't send a limit. My thinking was that when I pass a ?limit= explicitly, Prometheus adds its own warning to the response (ref code). I will expand the message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i misread && as ||, so I assumed this will add a warning when we clamp the limit. Do you want to do that too?

Copy link
Contributor Author

@narqo narqo Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. We don't do anything like that when we clamp other request parameters. Should we do that for the limit only? So I thought we will warn the user when they didn't pass the limit, but the limit was enforced — that feels the most unexpected scenario for them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree that putting a limit when the requester intended to get unlimited results is most surprising.

But also getting N results when you limited to N+1 would imply there are only N results. That's also a lie and somewhat surprising.

I think it's not a bad idea to add a warning when we clamp the start/end times (I suppose that's what you meant when you said "other request parameter"). I can do that next week

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new comment to not derail the conversation you're having with Dimitar. I suggest to be more verbose and clarify what we did, e.g. "the request had no limit, but a default limit of XXX has been enforced" (or something similar).

set = series.NewSeriesSetWithWarnings(set, warning)
}()
}
sp.Limit = limit

if len(queriers) == 1 {
return queriers[0].Select(ctx, true, sp, matchers...)
}
Expand Down Expand Up @@ -450,6 +465,24 @@ func clampToMaxLabelQueryLength(spanLog *spanlogger.SpanLogger, startMs, endMs,
return startMs
}

func clampToMaxSeriesQueryLimit(spanLog *spanlogger.SpanLogger, limit, maxSeriesQueryLimit int) int {
if maxSeriesQueryLimit == 0 {
// No request limit is enforced.
return limit
}

// Request limit is enforced.
newLimit := min(cmp.Or(limit, maxSeriesQueryLimit), maxSeriesQueryLimit)
if newLimit != limit {
spanLog.DebugLog(
"msg", "the request limit of the query has been manipulated because of the 'max-series-query-limit' setting",
"original", limit,
"updated", newLimit,
)
}
return newLimit
}

// LabelValues implements storage.Querier.
func (mq multiQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
ctx, queriers, err := mq.getQueriers(ctx, matchers...)
Expand Down
115 changes: 114 additions & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package querier

import (
"context"
stderrors "errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -1120,7 +1121,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) {
delta := float64(5000)
require.Len(t, distributor.Calls, 1)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
gotStartMillis := int64(distributor.Calls[0].Arguments.Get(1).(model.Time))
assert.InDeltaf(t, util.TimeToMillis(testData.expectedMetadataStartTime), gotStartMillis, delta, "expected start %s, got %s", testData.expectedMetadataStartTime.UTC(), util.TimeFromMillis(gotStartMillis).UTC())
gotEndMillis := int64(distributor.Calls[0].Arguments.Get(2).(model.Time))
Expand All @@ -1129,6 +1129,119 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) {
}
}

func TestQuerier_ValidateQuery_MaxSeriesQueryLimit(t *testing.T) {
const thirtyDays = 30 * 24 * time.Hour

now := time.Now()

tests := map[string]struct {
maxSeriesQueryLimit int
query string
queryStartTime time.Time
queryEndTime time.Time
queryLimit int
expectedLimit int
expectedWarning error
}{
"should not manipulate limit for a query when limit is not enforced": {
maxSeriesQueryLimit: 0,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 1000,
expectedLimit: 1000,
expectedWarning: nil,
},
"should not manipulate limit for a query without a limit when not enforced": {
maxSeriesQueryLimit: 0,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 0,
expectedLimit: 0,
expectedWarning: nil,
},
"should manipulate limit for a query when enforced": {
maxSeriesQueryLimit: 1000,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 1_000_000,
expectedLimit: 1000,
expectedWarning: nil,
},
"should manipulate limit for a query without a limit when enforced": {
maxSeriesQueryLimit: 1000,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 0,
expectedLimit: 1000,
expectedWarning: errors.New("results may be truncated due to limit"),
},
"should not manipulate limit for a query with limit smaller than what is enforced": {
maxSeriesQueryLimit: 1000,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 100,
expectedLimit: 100,
expectedWarning: nil,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

var cfg Config
flagext.DefaultValues(&cfg)

limits := defaultLimitsConfig()
limits.MaxQueryLookback = model.Duration(thirtyDays * 2)
limits.MaxSeriesQueryLimit = testData.maxSeriesQueryLimit
limits.QueryIngestersWithin = 0 // Always query ingesters in this test.
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

distributor := &mockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil)

queryable, _, _, err := New(cfg, overrides, distributor, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
require.NoError(t, err)

hints := &storage.SelectHints{
Start: util.TimeToMillis(testData.queryStartTime),
End: util.TimeToMillis(testData.queryEndTime),
Limit: testData.queryLimit,
Func: "series",
}
matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test")

set := q.Select(ctx, false, hints, matcher)
require.False(t, set.Next()) // Expected to be empty.
require.NoError(t, set.Err())

// Assert on the warning about enforced limit.
gotWarnings := set.Warnings().AsErrors()
if testData.expectedWarning != nil {
require.EqualError(t, stderrors.Join(gotWarnings...), testData.expectedWarning.Error())
} else {
require.Empty(t, gotWarnings)
}

// Assert on the limit of the actual executed query.
require.Len(t, distributor.Calls, 1)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
gotHints := distributor.Calls[0].Arguments.Get(3).(*storage.SelectHints)
require.Equal(t, testData.expectedLimit, gotHints.Limit)
})
}
}

func testRangeQuery(t testing.TB, queryable storage.Queryable, end model.Time, q query) *promql.Result {
dir := t.TempDir()
queryTracker := promql.NewActiveQueryTracker(dir, 10, promslog.NewNopLogger())
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type Limits struct {
MaxPartialQueryLength model.Duration `yaml:"max_partial_query_length" json:"max_partial_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`
MaxLabelsQueryLength model.Duration `yaml:"max_labels_query_length" json:"max_labels_query_length"`
MaxSeriesQueryLimit int `yaml:"max_series_query_limit" json:"max_series_query_limit"`
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness" category:"advanced"`
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
QueryShardingTotalShards int `yaml:"query_sharding_total_shards" json:"query_sharding_total_shards"`
Expand Down Expand Up @@ -334,6 +335,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler for instant, range and remote read queries. For metadata queries like series, label names, label values queries the limit is enforced in the querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers.")
f.Var(&l.MaxLabelsQueryLength, "store.max-labels-query-length", "Limit the time range (end - start time) of series, label names and values queries. This limit is enforced in the querier. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
f.IntVar(&l.MaxSeriesQueryLimit, "querier.max-series-query-limit", 0, "Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.")
f.IntVar(&l.LabelNamesAndValuesResultsMaxSizeBytes, "querier.label-names-and-values-results-max-size-bytes", 400*1024*1024, "Maximum size in bytes of distinct label names and values. When querier receives response from ingester, it merges the response with responses from other ingesters. This maximum size limit is applied to the merged(distinct) results. If the limit is reached, an error is returned.")
f.IntVar(&l.ActiveSeriesResultsMaxSizeBytes, "querier.active-series-results-max-size-bytes", 400*1024*1024, "Maximum size of an active series or active native histogram series request result shard in bytes. 0 to disable.")
f.BoolVar(&l.CardinalityAnalysisEnabled, "querier.cardinality-analysis-enabled", false, "Enables endpoints used for cardinality analysis.")
Expand Down Expand Up @@ -755,6 +757,11 @@ func (o *Overrides) MaxLabelsQueryLength(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MaxLabelsQueryLength)
}

// MaxSeriesQueryLimit returns the query limit of a series request.
func (o *Overrides) MaxSeriesQueryLimit(userID string) int {
return o.getOverridesForUser(userID).MaxSeriesQueryLimit
}

// MaxCacheFreshness returns the period after which results are cacheable,
// to prevent caching of very recent results.
func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
Expand Down