Skip to content

Commit

Permalink
querier: allow to enforce series query request limit
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Feb 26, 2025
1 parent 410d110 commit 86c9af2
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 8 deletions.
47 changes: 40 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package querier

import (
"cmp"
"context"
"errors"
"flag"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -318,7 +320,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context, matchers ...*labels.Matc

// Select implements storage.Querier interface.
// The bool passed is ignored because the series are always sorted.
func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) (set storage.SeriesSet) {
spanLog, ctx := spanlogger.NewWithLogger(ctx, mq.logger, "querier.Select")
defer spanLog.Span.Finish()

Expand Down Expand Up @@ -356,21 +358,24 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin
return storage.ErrSeriesSet(err)
}

// Validate query time range. Even if the time range has already been validated when we created
// the querier, we need to check it again here because the time range specified in hints may be
// different.
// Validate query parameters.
// Even if the time range has already been validated when we created the querier, we need to check it again here
// because the time range specified in hints may be different.
limit := sp.Limit
startMs, endMs, err := validateQueryTimeRange(userID, sp.Start, sp.End, now.UnixMilli(), mq.limits, spanLog)
if errors.Is(err, errEmptyTimeRange) {
return storage.NoopSeriesSet()
} else if err != nil {
return storage.ErrSeriesSet(err)
}
if sp.Func == "series" { // Clamp max time range for series-only queries, before we check max length.
if sp.Func == "series" {
// Clamp max time range for series-only queries, before we check max length.
startMs = clampToMaxLabelQueryLength(spanLog, startMs, endMs, now.UnixMilli(), mq.limits.MaxLabelsQueryLength(userID).Milliseconds())
// Clamp the limit for series-only queries.
limit = clampToMaxSeriesQueryLimit(spanLog, limit, mq.limits.MaxSeriesQueryLimit(userID))
}

// The time range may have been manipulated during the validation,
// so we make sure changes are reflected back to hints.
// The query parameters may have been manipulated during the validation, so we make sure changes are reflected back to hints.
sp.Start = startMs
sp.End = endMs

Expand All @@ -382,6 +387,16 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin
return storage.ErrSeriesSet(NewMaxQueryLengthError(endTime.Sub(startTime), maxQueryLength))
}

// If the original request didn't have a limit but the limit was enforced, annotate the resulting series set with a warning.
if sp.Limit == 0 && sp.Limit != limit {
defer func() {
var warning annotations.Annotations
warning.Add(errors.New("results may be truncated due to limit"))
set = series.NewSeriesSetWithWarnings(set, warning)
}()
}
sp.Limit = limit

if len(queriers) == 1 {
return queriers[0].Select(ctx, true, sp, matchers...)
}
Expand Down Expand Up @@ -450,6 +465,24 @@ func clampToMaxLabelQueryLength(spanLog *spanlogger.SpanLogger, startMs, endMs,
return startMs
}

func clampToMaxSeriesQueryLimit(spanLog *spanlogger.SpanLogger, limit, maxSeriesQueryLimit int) int {
if maxSeriesQueryLimit == 0 {
// No request limit is enforced.
return limit
}

// Request limit is enforced.
newLimit := min(cmp.Or(limit, maxSeriesQueryLimit), maxSeriesQueryLimit)
if newLimit != limit {
spanLog.DebugLog(
"msg", "the request limit of the query has been manipulated because of the 'max-series-query-limit' setting",
"original", limit,
"updated", newLimit,
)
}
return newLimit
}

// LabelValues implements storage.Querier.
func (mq multiQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
ctx, queriers, err := mq.getQueriers(ctx, matchers...)
Expand Down
115 changes: 114 additions & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package querier

import (
"context"
stderrors "errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -1120,7 +1121,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) {
delta := float64(5000)
require.Len(t, distributor.Calls, 1)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
gotStartMillis := int64(distributor.Calls[0].Arguments.Get(1).(model.Time))
assert.InDeltaf(t, util.TimeToMillis(testData.expectedMetadataStartTime), gotStartMillis, delta, "expected start %s, got %s", testData.expectedMetadataStartTime.UTC(), util.TimeFromMillis(gotStartMillis).UTC())
gotEndMillis := int64(distributor.Calls[0].Arguments.Get(2).(model.Time))
Expand All @@ -1129,6 +1129,119 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) {
}
}

func TestQuerier_ValidateQuery_MaxSeriesQueryLimit(t *testing.T) {
const thirtyDays = 30 * 24 * time.Hour

now := time.Now()

tests := map[string]struct {
maxSeriesQueryLimit int
query string
queryStartTime time.Time
queryEndTime time.Time
queryLimit int
expectedLimit int
expectedWarning error
}{
"should not manipulate limit for a query when limit is not enforced": {
maxSeriesQueryLimit: 0,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 1000,
expectedLimit: 1000,
expectedWarning: nil,
},
"should not manipulate limit for a query without a limit when not enforced": {
maxSeriesQueryLimit: 0,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 0,
expectedLimit: 0,
expectedWarning: nil,
},
"should manipulate limit for a query when enforced": {
maxSeriesQueryLimit: 1000,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 1_000_000,
expectedLimit: 1000,
expectedWarning: nil,
},
"should manipulate limit for a query without a limit when enforced": {
maxSeriesQueryLimit: 1000,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 0,
expectedLimit: 1000,
expectedWarning: errors.New("results may be truncated due to limit"),
},
"should not manipulate limit for a query with limit smaller than what is enforced": {
maxSeriesQueryLimit: 1000,
query: "rate(foo[29d])",
queryStartTime: now.Add(-time.Hour),
queryEndTime: now,
queryLimit: 100,
expectedLimit: 100,
expectedWarning: nil,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

var cfg Config
flagext.DefaultValues(&cfg)

limits := defaultLimitsConfig()
limits.MaxQueryLookback = model.Duration(thirtyDays * 2)
limits.MaxSeriesQueryLimit = testData.maxSeriesQueryLimit
limits.QueryIngestersWithin = 0 // Always query ingesters in this test.
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

distributor := &mockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil)

queryable, _, _, err := New(cfg, overrides, distributor, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
require.NoError(t, err)

hints := &storage.SelectHints{
Start: util.TimeToMillis(testData.queryStartTime),
End: util.TimeToMillis(testData.queryEndTime),
Limit: testData.queryLimit,
Func: "series",
}
matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test")

set := q.Select(ctx, false, hints, matcher)
require.False(t, set.Next()) // Expected to be empty.
require.NoError(t, set.Err())

// Assert on the warning about enforced limit.
gotWarnings := set.Warnings().AsErrors()
if testData.expectedWarning != nil {
require.EqualError(t, stderrors.Join(gotWarnings...), testData.expectedWarning.Error())
} else {
require.Empty(t, gotWarnings)
}

// Assert on the limit of the actual executed query.
require.Len(t, distributor.Calls, 1)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
gotHints := distributor.Calls[0].Arguments.Get(3).(*storage.SelectHints)
require.Equal(t, testData.expectedLimit, gotHints.Limit)
})
}
}

func testRangeQuery(t testing.TB, queryable storage.Queryable, end model.Time, q query) *promql.Result {
dir := t.TempDir()
queryTracker := promql.NewActiveQueryTracker(dir, 10, promslog.NewNopLogger())
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type Limits struct {
MaxPartialQueryLength model.Duration `yaml:"max_partial_query_length" json:"max_partial_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`
MaxLabelsQueryLength model.Duration `yaml:"max_labels_query_length" json:"max_labels_query_length"`
MaxSeriesQueryLimit int `yaml:"max_series_query_limit" json:"max_series_query_limit"`
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness" category:"advanced"`
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
QueryShardingTotalShards int `yaml:"query_sharding_total_shards" json:"query_sharding_total_shards"`
Expand Down Expand Up @@ -334,6 +335,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler for instant, range and remote read queries. For metadata queries like series, label names, label values queries the limit is enforced in the querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split (by time) or partial (by shard) queries that will be scheduled in parallel by the query-frontend for a single input query. This limit is introduced to have a fairer query scheduling and avoid a single query over a large time range saturating all available queriers.")
f.Var(&l.MaxLabelsQueryLength, "store.max-labels-query-length", "Limit the time range (end - start time) of series, label names and values queries. This limit is enforced in the querier. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
f.IntVar(&l.MaxSeriesQueryLimit, "querier.max-series-query-limit", 0, "Maximum number of items, series queries. This limit is enforced in the querier. If the requested limit is outside the allowed value, the request will not fail but will be manipulated to only query data up to allowed limit. 0 to disable.")
f.IntVar(&l.LabelNamesAndValuesResultsMaxSizeBytes, "querier.label-names-and-values-results-max-size-bytes", 400*1024*1024, "Maximum size in bytes of distinct label names and values. When querier receives response from ingester, it merges the response with responses from other ingesters. This maximum size limit is applied to the merged(distinct) results. If the limit is reached, an error is returned.")
f.IntVar(&l.ActiveSeriesResultsMaxSizeBytes, "querier.active-series-results-max-size-bytes", 400*1024*1024, "Maximum size of an active series or active native histogram series request result shard in bytes. 0 to disable.")
f.BoolVar(&l.CardinalityAnalysisEnabled, "querier.cardinality-analysis-enabled", false, "Enables endpoints used for cardinality analysis.")
Expand Down Expand Up @@ -755,6 +757,11 @@ func (o *Overrides) MaxLabelsQueryLength(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MaxLabelsQueryLength)
}

// MaxSeriesQueryLimit returns the query limit of a series request.
func (o *Overrides) MaxSeriesQueryLimit(userID string) int {
return o.getOverridesForUser(userID).MaxSeriesQueryLimit
}

// MaxCacheFreshness returns the period after which results are cacheable,
// to prevent caching of very recent results.
func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
Expand Down

0 comments on commit 86c9af2

Please sign in to comment.