Skip to content

Commit

Permalink
query frontend: Subquery spin-off without HTTP call (#10742)
Browse files Browse the repository at this point in the history
* WIP: Have optional query middleware

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix TODOs, remove URL flag, add enable flag, add integration test

* CHANGELOG

* Address PR comments

- Remove extra range middlewares
- Always test subquery
- Pass a single middleware

* Fix integration tests

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
julienduchesne and dimitarvdimitrov authored Feb 27, 2025
1 parent fa80c90 commit 13ff857
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 256 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* [ENHANCEMENT] Distributor, querier, ingester and store-gateway: Add support for `limit` parameter for label names and values requests. #10410
* [ENHANCEMENT] Query-frontend: Allow adjustment of queries looking into the future to a maximum duration with experimental `-query-frontend.max-future-query-window` flag. #10547
* [ENHANCEMENT] Ruler: Adds support for filtering results from rule status endpoint by `file[]`, `rule_group[]` and `rule_name[]`. #10589
* [ENHANCEMENT] Query-frontend: Add option to "spin off" subqueries as actual range queries, so that they benefit from query acceleration techniques such as sharding, splitting and caching. To enable this, set the `-query-frontend.spin-off-instant-subqueries-to-url=<url>` option on the frontend and the `instant_queries_with_subquery_spin_off` per-tenant override with regular expressions matching the queries to enable. #10460 #10603 #10621
* [ENHANCEMENT] Query-frontend: Add option to "spin off" subqueries as actual range queries, so that they benefit from query acceleration techniques such as sharding, splitting, and caching. To enable this feature, set the `-query-frontend.instant-queries-with-subquery-spin-off=<comma separated list>` option on the frontend or the `instant_queries_with_subquery_spin_off` per-tenant override with regular expressions matching the queries to enable. #10460 #10603 #10621 #10742
* [ENHANCEMENT] Querier, ingester: The series API respects passed `limit` parameter. #10620 #10652
* [ENHANCEMENT] Store-gateway: Add experimental settings under `-store-gateway.dynamic-replication` to allow more than the default of 3 store-gateways to own recent blocks. #10382 #10637
* [ENHANCEMENT] Ingester: Add reactive concurrency limiters to protect push and read operations from overload. #10574
Expand Down
16 changes: 3 additions & 13 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4712,8 +4712,9 @@
"required": false,
"desc": "List of regular expression patterns matching instant queries. Subqueries within those instant queries will be spun off as range queries to optimize their performance.",
"fieldValue": null,
"fieldDefaultValue": [],
"fieldType": "list of strings",
"fieldDefaultValue": "",
"fieldFlag": "query-frontend.instant-queries-with-subquery-spin-off",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
Expand Down Expand Up @@ -6963,17 +6964,6 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "spin_off_instant_subqueries_to_url",
"required": false,
"desc": "If set, subqueries in instant queries are spun off as range queries and sent to the given URL. This parameter also requires you to set `instant_queries_with_subquery_spin_off` for the tenant.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "query-frontend.spin-off-instant-subqueries-to-url",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "query_result_response_format",
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2343,6 +2343,8 @@ Usage of ./cmd/mimir/mimir:
List of network interface names to look up when finding the instance IP address. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend. (default [<private network interfaces>])
-query-frontend.instance-port int
Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).
-query-frontend.instant-queries-with-subquery-spin-off comma-separated-list-of-strings
[experimental] List of regular expression patterns matching instant queries. Subqueries within those instant queries will be spun off as range queries to optimize their performance.
-query-frontend.log-queries-longer-than duration
Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.
-query-frontend.log-query-request-headers comma-separated-list-of-strings
Expand Down Expand Up @@ -2497,8 +2499,6 @@ Usage of ./cmd/mimir/mimir:
Number of concurrent workers forwarding queries to single query-scheduler. (default 5)
-query-frontend.shard-active-series-queries
[experimental] True to enable sharding of active series queries.
-query-frontend.spin-off-instant-subqueries-to-url string
[experimental] If set, subqueries in instant queries are spun off as range queries and sent to the given URL. This parameter also requires you to set `instant_queries_with_subquery_spin_off` for the tenant.
-query-frontend.split-instant-queries-by-interval duration
[experimental] Split instant queries by an interval and execute in parallel. 0 to disable it.
-query-frontend.split-queries-by-interval duration
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ The following features are currently experimental:
- Server-side write timeout for responses to active series requests (`-query-frontend.active-series-write-timeout`)
- Caching of non-transient error responses (`-query-frontend.cache-errors`, `-query-frontend.results-cache-ttl-for-errors`)
- Blocking HTTP requests on a per-tenant basis (configured with the `blocked_requests` limit)
- Spinning off (as actual range queries) subqueries from instant queries (`-query-frontend.spin-off-instant-subqueries-to-url` and the `instant_queries_with_subquery_spin_off` per-tenant limit)
- Spinning off (as actual range queries) subqueries from instant queries (`-query-frontend.instant-queries-with-subquery-spin-off` and the `instant_queries_with_subquery_spin_off` per-tenant limit)
- Enable PromQL experimental functions per-tenant (`-query-frontend.enabled-promql-experimental-functions` and the `enabled_promql_experimental_functions` per-tenant limit)
- Query-scheduler
- `-query-scheduler.querier-forget-delay`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1854,12 +1854,6 @@ results_cache:
# CLI flag: -query-frontend.use-active-series-decoder
[use_active_series_decoder: <boolean> | default = false]

# (experimental) If set, subqueries in instant queries are spun off as range
# queries and sent to the given URL. This parameter also requires you to set
# `instant_queries_with_subquery_spin_off` for the tenant.
# CLI flag: -query-frontend.spin-off-instant-subqueries-to-url
[spin_off_instant_subqueries_to_url: <string> | default = ""]

# Format to use when retrieving query results from queriers. Supported values:
# json, protobuf
# CLI flag: -query-frontend.query-result-response-format
Expand Down Expand Up @@ -3752,7 +3746,8 @@ The `limits` block configures default and per-tenant limits imposed by component
# (experimental) List of regular expression patterns matching instant queries.
# Subqueries within those instant queries will be spun off as range queries to
# optimize their performance.
[instant_queries_with_subquery_spin_off: <list of strings> | default = ]
# CLI flag: -query-frontend.instant-queries-with-subquery-spin-off
[instant_queries_with_subquery_spin_off: <string> | default = ""]

# (experimental) Mutate incoming queries that look far into the future to only
# look into the future by the set duration. 0 to disable.
Expand Down
25 changes: 19 additions & 6 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
configFile, flags := cfg.setup(t, s)

flags = mergeFlags(flags, map[string]string{
"-query-frontend.cache-results": "true",
"-query-frontend.results-cache.backend": "memcached",
"-query-frontend.results-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-query-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
"-query-frontend.cache-results": "true",
"-query-frontend.results-cache.backend": "memcached",
"-query-frontend.results-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-query-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
"-query-frontend.instant-queries-with-subquery-spin-off": ".*",
})

// Start the query-scheduler if enabled.
Expand Down Expand Up @@ -396,6 +397,16 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
require.Len(t, result, 0)
}

if userID == 0 {
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(0), "cortex_frontend_spun_off_subqueries_total"))
result, err := c.Query("sum_over_time(((count(series_1) * count(series_1)) or vector(1))[6h:15m])", now)
require.NoError(t, err)
require.Len(t, result, 1)
require.Equal(t, result.(model.Vector)[0].Metric, model.Metric{})
require.Equal(t, result.(model.Vector)[0].Value, model.SampleValue(24)) // vector(1) for each step
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_frontend_spun_off_subqueries_total"))
}

for q := 0; q < numQueriesPerUser; q++ {
go func() {
defer wg.Done()
Expand All @@ -411,8 +422,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
wg.Wait()

// Compute the expected number of queries.
expectedQueriesCount := float64(numUsers*numQueriesPerUser) + 2
expectedIngesterQueriesCount := float64(numUsers * numQueriesPerUser) // The "time()" query and the query with time range < "query ingesters within" are not pushed down to ingesters.
expectedQueriesCount := float64(numUsers*numQueriesPerUser) + 3
// The "time()" query and the query with time range < "query ingesters within" are not pushed down to ingesters.
// +2 because the spun off subquery ends up as additional ingester queries.
expectedIngesterQueriesCount := float64(numUsers*numQueriesPerUser) + 2
if cfg.queryStatsEnabled {
expectedQueriesCount++
expectedIngesterQueriesCount++
Expand Down
57 changes: 21 additions & 36 deletions pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/grafana/regexp"
Expand Down Expand Up @@ -57,18 +56,17 @@ var (

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval" category:"advanced"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
CacheErrors bool `yaml:"cache_errors" category:"experimental"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
NotRunningTimeout time.Duration `yaml:"not_running_timeout" category:"advanced"`
ShardedQueries bool `yaml:"parallelize_shardable_queries"`
PrunedQueries bool `yaml:"prune_queries" category:"experimental"`
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`
ShardActiveSeriesQueries bool `yaml:"shard_active_series_queries" category:"experimental"`
UseActiveSeriesDecoder bool `yaml:"use_active_series_decoder" category:"experimental"`
SpinOffInstantSubqueriesToURL string `yaml:"spin_off_instant_subqueries_to_url" category:"experimental"`
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval" category:"advanced"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
CacheErrors bool `yaml:"cache_errors" category:"experimental"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
NotRunningTimeout time.Duration `yaml:"not_running_timeout" category:"advanced"`
ShardedQueries bool `yaml:"parallelize_shardable_queries"`
PrunedQueries bool `yaml:"prune_queries" category:"experimental"`
TargetSeriesPerShard uint64 `yaml:"query_sharding_target_series_per_shard" category:"advanced"`
ShardActiveSeriesQueries bool `yaml:"shard_active_series_queries" category:"experimental"`
UseActiveSeriesDecoder bool `yaml:"use_active_series_decoder" category:"experimental"`

// CacheKeyGenerator allows to inject a CacheKeyGenerator to use for generating cache keys.
// If nil, the querymiddleware package uses a DefaultCacheKeyGenerator with SplitQueriesByInterval.
Expand Down Expand Up @@ -100,7 +98,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.QueryResultResponseFormat, "query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from queriers. Supported values: %s", strings.Join(allFormats, ", ")))
f.BoolVar(&cfg.ShardActiveSeriesQueries, "query-frontend.shard-active-series-queries", false, "True to enable sharding of active series queries.")
f.BoolVar(&cfg.UseActiveSeriesDecoder, "query-frontend.use-active-series-decoder", false, "Set to true to use the zero-allocation response decoder for active series queries.")
f.StringVar(&cfg.SpinOffInstantSubqueriesToURL, "query-frontend.spin-off-instant-subqueries-to-url", "", "If set, subqueries in instant queries are spun off as range queries and sent to the given URL. This parameter also requires you to set `instant_queries_with_subquery_spin_off` for the tenant.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}

Expand Down Expand Up @@ -379,24 +376,6 @@ func newQueryMiddlewares(
newStepAlignMiddleware(limits, log, registerer),
)

if spinOffURL := cfg.SpinOffInstantSubqueriesToURL; spinOffURL != "" {
// Spin-off subqueries to a remote URL (or localhost)
// Add the retry middleware to the spin-off query handler.
// Spun-off queries are terminated in that handler (they don't call "next" so the retry middleware has to be added here).
spinOffQueryHandler, err := newSpinOffQueryHandler(codec, log, spinOffURL, cfg.MaxRetries, retryMiddlewareMetrics)
if err != nil {
level.Error(log).Log("msg", "failed to create spin-off query handler", "error", err)
} else {
// We're only interested in instant queries for now because their query rate is usually much higher than range queries.
// They are also less optimized than range queries, so we can benefit more from the spin-off.
queryInstantMiddleware = append(
queryInstantMiddleware,
newInstrumentMiddleware("spin_off_subqueries", metrics),
newSpinOffSubqueriesMiddleware(limits, log, engine, spinOffQueryHandler, registerer, defaultStepFunc),
)
}
}

if cfg.CacheResults && cfg.CacheErrors {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand All @@ -405,9 +384,10 @@ func newQueryMiddlewares(
)
}

// Inject the middleware to split requests by interval + results cache (if at least one of the two is enabled).
// Create split and cache middleware if either splitting or caching is enabled
var splitAndCacheMiddleware MetricsQueryMiddleware
if cfg.SplitQueriesByInterval > 0 || cfg.CacheResults {
queryRangeMiddleware = append(queryRangeMiddleware, newInstrumentMiddleware("split_by_interval_and_results_cache", metrics), newSplitAndCacheMiddleware(
splitAndCacheMiddleware = newSplitAndCacheMiddleware(
cfg.SplitQueriesByInterval > 0,
cfg.CacheResults,
cfg.SplitQueriesByInterval,
Expand All @@ -419,11 +399,10 @@ func newQueryMiddlewares(
resultsCacheEnabledByOption,
log,
registerer,
))
)
}

queryInstantMiddleware = append(queryInstantMiddleware,
// Track query range statistics. Added first before any subsequent middleware modifies the request.
queryStatsMiddleware,
newLimitsMiddleware(limits, log),
newSplitInstantQueryByIntervalMiddleware(limits, log, engine, registerer),
Expand All @@ -432,6 +411,12 @@ func newQueryMiddlewares(
prom2CompatMiddleware,
)

queryInstantMiddleware = append(
queryInstantMiddleware,
newInstrumentMiddleware("spin_off_subqueries", metrics),
newSpinOffSubqueriesMiddleware(limits, log, engine, registerer, splitAndCacheMiddleware, defaultStepFunc),
)

// Inject the extra middlewares provided by the user before the query pruning and query sharding middleware.
if len(cfg.ExtraInstantQueryMiddlewares) > 0 {
queryInstantMiddleware = append(queryInstantMiddleware, cfg.ExtraInstantQueryMiddlewares...)
Expand Down
1 change: 0 additions & 1 deletion pkg/frontend/querymiddleware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,6 @@ func TestMiddlewaresConsistency(t *testing.T) {
cfg.CacheResults = true
cfg.ShardedQueries = true
cfg.PrunedQueries = true
cfg.SpinOffInstantSubqueriesToURL = "http://localhost"

// Ensure all features are enabled, so that we assert on all middlewares.
require.NotZero(t, cfg.CacheResults)
Expand Down
Loading

0 comments on commit 13ff857

Please sign in to comment.