Skip to content

Commit

Permalink
frontend: Add option to "spin off" subqueries as actual range queries (
Browse files Browse the repository at this point in the history
…#10460)

* frontend: Add option to "spin off" subqueries as actual range queries
Issue: #10023

This is a new feature that is completely isolated within a new middleware so it shouldn't affect current functionality of the frontend.
For safety, it requires two configurations to be enabled:
- `--query-frontend.spin-off-instant-subqueries-to-url=<url>` on the frontend. This should be set to the URL of the frontend for optimal performance. The range queries are load balanced across frontends
- `instant_queries_with_subquery_spin_off` in tenant configs. These are regexp patterns that allow us to match individual queries (or all of them). This will allow us to opt-in queries to enable the feature gradually

The feature was developed by basing myself upon the query sharding feature. The queries are mapped into either downstream queries or subqueries. Both types of queries are run and the results are fed back into prometheus' engine and the result is calculated in the frontend.

Performance impact:
The AST mapper only selects queries that are susceptible to be improved, others are just passed on to the next middleware.
For the queries that are improved, results can be up to 50x faster. When a query is selected, the worst cases I've seen are ~equal or a bit better in performance to unmodified queries.
Further tests will be done and the mapper may be improved to detect cases that aren't optimal

PromQL results impact:
None detected from all the tests I've done

* Address PR comments

* Address PR comments + fixes
- Support offsets
- Disable `@`
- Improve tests. Run each query with a different offset each time
- Add new test cases with `offset x`
- Add new test case with a long range (more than 11000 steps). It has to be split into multiple range queries
- Allow setting query path in frontend arg (instead of hardcoding `/prometheus/api/v1/query_range`)

* Make tests faster

* Address PR comments:
- Use step align code from MQE
- Fix up comments and log messages
- Return error in case of a wrong

* Make tests faster
- put the `SpinOffQueryHandler` into its own test
- do not test offset time on all queries

* Add retries to the spun-off range queries
Haven't seen any failures so far from my testing in Kubernetes, but it _will_ happen without some retries
This makes use of the regular retry middleware, so the configured retry settings will apply to these spun-off queries as well

Also, add some more tests

* Query Optimization: Align the query's end

* Add missing case of complex expressions

* Fix case where multiple range queries are merged
+ Add test that actually tests over a long range
  • Loading branch information
julienduchesne authored Jan 31, 2025
1 parent 87445ce commit 94a0f30
Show file tree
Hide file tree
Showing 20 changed files with 1,630 additions and 34 deletions.
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4369,6 +4369,16 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "instant_queries_with_subquery_spin_off",
"required": false,
"desc": "List of regular expression patterns matching instant queries. Subqueries within those instant queries will be spun off as range queries to optimize their performance.",
"fieldValue": null,
"fieldDefaultValue": [],
"fieldType": "list of strings",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cardinality_analysis_enabled",
Expand Down Expand Up @@ -6586,6 +6596,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "spin_off_instant_subqueries_to_url",
"required": false,
"desc": "If set, subqueries in instant queries are spun off as range queries and sent to the given URL. This parameter also requires you to set `instant_queries_with_subquery_spin_off` for the tenant.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "query-frontend.spin-off-instant-subqueries-to-url",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "query_result_response_format",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2429,6 +2429,8 @@ Usage of ./cmd/mimir/mimir:
Number of concurrent workers forwarding queries to single query-scheduler. (default 5)
-query-frontend.shard-active-series-queries
[experimental] True to enable sharding of active series queries.
-query-frontend.spin-off-instant-subqueries-to-url string
[experimental] If set, subqueries in instant queries are spun off as range queries and sent to the given URL. This parameter also requires you to set `instant_queries_with_subquery_spin_off` for the tenant.
-query-frontend.split-instant-queries-by-interval duration
[experimental] Split instant queries by an interval and execute in parallel. 0 to disable it.
-query-frontend.split-queries-by-interval duration
Expand Down
11 changes: 11 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,12 @@ results_cache:
# CLI flag: -query-frontend.use-active-series-decoder
[use_active_series_decoder: <boolean> | default = false]

# (experimental) If set, subqueries in instant queries are spun off as range
# queries and sent to the given URL. This parameter also requires you to set
# `instant_queries_with_subquery_spin_off` for the tenant.
# CLI flag: -query-frontend.spin-off-instant-subqueries-to-url
[spin_off_instant_subqueries_to_url: <string> | default = ""]

# Format to use when retrieving query results from queriers. Supported values:
# json, protobuf
# CLI flag: -query-frontend.query-result-response-format
Expand Down Expand Up @@ -3604,6 +3610,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -query-frontend.prom2-range-compat
[prom2_range_compat: <boolean> | default = false]

# (experimental) List of regular expression patterns matching instant queries.
# Subqueries within those instant queries will be spun off as range queries to
# optimize their performance.
[instant_queries_with_subquery_spin_off: <list of strings> | default = ]

# Enables endpoints used for cardinality analysis.
# CLI flag: -querier.cardinality-analysis-enabled
[cardinality_analysis_enabled: <boolean> | default = false]
Expand Down
209 changes: 209 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
)

const (
SubqueryMetricName = "__subquery_spinoff__"
SubqueryQueryLabelName = "__query__"
SubqueryRangeLabelName = "__range__"
SubqueryStepLabelName = "__step__"
SubqueryOffsetLabelName = "__offset__"

DownstreamQueryMetricName = "__downstream_query__"
DownstreamQueryLabelName = "__query__"
)

type subquerySpinOffMapper struct {
ctx context.Context
defaultStepFunc func(rangeMillis int64) int64

logger log.Logger
stats *SubquerySpinOffMapperStats
}

// NewSubquerySpinOffMapper creates a new instant query mapper.
func NewSubquerySpinOffMapper(ctx context.Context, defaultStepFunc func(rangeMillis int64) int64, logger log.Logger, stats *SubquerySpinOffMapperStats) ASTMapper {
queryMapper := NewASTExprMapper(
&subquerySpinOffMapper{
ctx: ctx,
defaultStepFunc: defaultStepFunc,
logger: logger,
stats: stats,
},
)

return NewMultiMapper(
queryMapper,
)
}

// MapExpr implements the ASTMapper interface.
// The strategy here is to look for aggregated subqueries (all subqueries should be aggregated) and spin them off into separate queries.
// The frontend does not have internal control of the engine,
// so MapExpr has to remap subqueries into "fake metrics" that can be queried by a Queryable that we can inject into the engine.
// This "fake metric selector" is the "__subquery_spinoff__" metric.
// For everything else, we have to pass it through to the downstream execution path (other instant middlewares),
// so we remap them into a "__downstream_query__" selector.
//
// See sharding.go and embedded.go for another example of mapping into a fake metric selector.
func (m *subquerySpinOffMapper) MapExpr(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if err := m.ctx.Err(); err != nil {
return nil, false, err
}

// Immediately clone the expr to avoid mutating the original
expr, err = cloneExpr(expr)
if err != nil {
return nil, false, err
}

downstreamQuery := func(expr parser.Expr) (mapped parser.Expr, finished bool, err error) {
if countSelectors(expr) == 0 {
return expr, false, nil
}
selector := &parser.VectorSelector{
Name: DownstreamQueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, DownstreamQueryLabelName, expr.String()),
},
}
m.stats.AddDownstreamQuery()
return selector, false, nil
}

switch e := expr.(type) {
case *parser.Call:
if len(e.Args) == 0 {
return expr, false, nil
}
lastArgIdx := len(e.Args) - 1
// The last argument will typically contain the subquery in an aggregation function
// Examples: last_over_time(<subquery>[5m:]) or quantile_over_time(0.5, <subquery>[5m:])
if sq, ok := e.Args[lastArgIdx].(*parser.SubqueryExpr); ok {
// @ is not supported
if sq.StartOrEnd != 0 || sq.Timestamp != nil {
return downstreamQuery(expr)
}

// Filter out subqueries with ranges less than 1 hour as they are not worth spinning off.
if sq.Range < 1*time.Hour {
return downstreamQuery(expr)
}

selectorsCt := countSelectors(sq.Expr)

// Evaluate constants within the frontend engine
if selectorsCt == 0 {
return expr, false, nil
}

// Filter out subqueries that are just selectors, they are fast enough that they aren't worth spinning off.
if selectorsCt == 1 && !isComplexExpr(sq.Expr) {
return downstreamQuery(expr)
}

step := sq.Step
if step == 0 {
if m.defaultStepFunc == nil {
return nil, false, errors.New("defaultStepFunc is not set")
}
step = time.Duration(m.defaultStepFunc(sq.Range.Milliseconds())) * time.Millisecond
}

// Filter out subqueries with less than 10 steps as they are not worth spinning off.
numberOfSteps := int(sq.Range / step)
if numberOfSteps < 10 {
return downstreamQuery(expr)
}

selector := &parser.VectorSelector{
Name: SubqueryMetricName,
LabelMatchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, SubqueryQueryLabelName, sq.Expr.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryRangeLabelName, sq.Range.String()),
labels.MustNewMatcher(labels.MatchEqual, SubqueryStepLabelName, step.String()),
},
}

if sq.OriginalOffset != 0 {
selector.LabelMatchers = append(selector.LabelMatchers, labels.MustNewMatcher(labels.MatchEqual, SubqueryOffsetLabelName, sq.OriginalOffset.String()))
selector.OriginalOffset = sq.OriginalOffset
}

e.Args[lastArgIdx] = &parser.MatrixSelector{
VectorSelector: selector,
Range: sq.Range,
}
m.stats.AddSpunOffSubquery()
return e, true, nil
}

return downstreamQuery(expr)
default:
// If there's no subquery in the children, we can abort early and pass the expression through to the downstream execution path.
if !hasSubqueryInChildren(expr) {
return downstreamQuery(expr)
}
return expr, false, nil
}
}

func isComplexExpr(expr parser.Node) bool {
switch e := expr.(type) {
case *parser.SubqueryExpr:
return true
case *parser.AggregateExpr:
return countSelectors(e.Expr) > 0
case *parser.Call:
for _, arg := range e.Args {
if _, ok := arg.(*parser.MatrixSelector); ok || isComplexExpr(arg) {
return true
}
}
return false
default:
for _, child := range parser.Children(e) {
if isComplexExpr(child) {
return true
}
}
return false
}
}

func hasSubqueryInChildren(expr parser.Node) bool {
switch e := expr.(type) {
case *parser.SubqueryExpr:
return true
default:
for _, child := range parser.Children(e) {
if hasSubqueryInChildren(child) {
return true
}
}
return false
}
}

func countSelectors(expr parser.Node) int {
switch e := expr.(type) {
case *parser.VectorSelector, *parser.MatrixSelector:
return 1
default:
count := 0
for _, child := range parser.Children(e) {
count += countSelectors(child)
}
return count
}
}
28 changes: 28 additions & 0 deletions pkg/frontend/querymiddleware/astmapper/subquery_spin_off_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// SPDX-License-Identifier: AGPL-3.0-only

package astmapper

type SubquerySpinOffMapperStats struct {
spunOffSubqueries int // counter of subqueries extracted
downstreamQueries int // counter of downstream queries extracted
}

func NewSubquerySpinOffMapperStats() *SubquerySpinOffMapperStats {
return &SubquerySpinOffMapperStats{}
}

func (s *SubquerySpinOffMapperStats) AddSpunOffSubquery() {
s.spunOffSubqueries++
}

func (s *SubquerySpinOffMapperStats) AddDownstreamQuery() {
s.downstreamQueries++
}

func (s *SubquerySpinOffMapperStats) SpunOffSubqueries() int {
return s.spunOffSubqueries
}

func (s *SubquerySpinOffMapperStats) DownstreamQueries() int {
return s.downstreamQueries
}
Loading

0 comments on commit 94a0f30

Please sign in to comment.