Skip to content

Commit

Permalink
query-frontend: Fix cache keys for dynamic split intervals (thanos-io…
Browse files Browse the repository at this point in the history
  • Loading branch information
lachruzam authored Nov 5, 2024
1 parent 9bc3cc0 commit ebfc03e
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 129 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.
- [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
34 changes: 19 additions & 15 deletions pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,38 @@ import (

// thanosCacheKeyGenerator is a utility for using split interval when determining cache keys.
type thanosCacheKeyGenerator struct {
interval queryrange.IntervalFn
resolutions []int64
}

func newThanosCacheKeyGenerator(intervalFn queryrange.IntervalFn) thanosCacheKeyGenerator {
func newThanosCacheKeyGenerator() thanosCacheKeyGenerator {
return thanosCacheKeyGenerator{
interval: intervalFn,
resolutions: []int64{downsample.ResLevel2, downsample.ResLevel1, downsample.ResLevel0},
}
}

// GenerateCacheKey generates a cache key based on the Request and interval.
// TODO(yeya24): Add other request params as request key.
func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Request) string {
currentInterval := r.GetStart() / t.interval(r).Milliseconds()
switch tr := r.(type) {
case *ThanosQueryRangeRequest:
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
if sr, ok := r.(SplitRequest); ok {
splitInterval := sr.GetSplitInterval().Milliseconds()
currentInterval := r.GetStart() / splitInterval

switch tr := r.(type) {
case *ThanosQueryRangeRequest:
i := 0
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, splitInterval, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d:%d", userID, tr.Label, tr.Matchers, splitInterval, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, tr.Matchers, splitInterval, currentInterval)
}
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey, tr.LookbackDelta, tr.Engine)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
return fmt.Sprintf("fe:%s:%s:%d", userID, tr.Matchers, currentInterval)
}
return fmt.Sprintf("fe:%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)

// all possible request types are already covered
panic("request type not supported")
}

func generateShardInfoKey(r *ThanosQueryRangeRequest) string {
Expand Down
96 changes: 59 additions & 37 deletions pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,32 @@ import (
)

func TestGenerateCacheKey(t *testing.T) {
intervalFn := func(r queryrange.Request) time.Duration { return hour }
splitter := newThanosCacheKeyGenerator(intervalFn)
splitter := newThanosCacheKeyGenerator()

for _, tc := range []struct {
name string
req queryrange.Request
expected string
}{
{
name: "non thanos req",
req: &queryrange.PrometheusRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
},
expected: "fe::up:60000:0",
},
{
name: "non downsampling resolution specified",
req: &ThanosQueryRangeRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
Query: "up",
Start: 0,
Step: 60 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:60000:0:2:-:0:",
expected: "fe::up:60000:3600000:0:2:-:0:",
},
{
name: "10s step",
req: &ThanosQueryRangeRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
Query: "up",
Start: 0,
Step: 10 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:2:-:0:",
expected: "fe::up:10000:3600000:0:2:-:0:",
},
{
name: "1m downsampling resolution",
Expand All @@ -57,8 +49,9 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:2:-:0:",
expected: "fe::up:10000:3600000:0:2:-:0:",
},
{
name: "5m downsampling resolution, different cache key",
Expand All @@ -67,8 +60,9 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:1:-:0:",
expected: "fe::up:10000:3600000:0:1:-:0:",
},
{
name: "1h downsampling resolution, different cache key",
Expand All @@ -77,8 +71,9 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: hour,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:0:-:0:",
expected: "fe::up:10000:3600000:0:0:-:0:",
},
{
name: "1h downsampling resolution with lookback delta",
Expand All @@ -88,23 +83,26 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: hour,
LookbackDelta: 1000,
SplitInterval: time.Hour,
},
expected: "fe::up:10000:0:0:-:1000:",
expected: "fe::up:10000:3600000:0:0:-:1000:",
},
{
name: "label names, no matcher",
req: &ThanosLabelsRequest{
Start: 0,
Start: 0,
SplitInterval: time.Hour,
},
expected: "fe:::[]:0",
expected: "fe:::[]:3600000:0",
},
{
name: "label names, single matcher",
req: &ThanosLabelsRequest{
Start: 0,
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Start: 0,
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
SplitInterval: time.Hour,
},
expected: `fe:::[[foo="bar"]]:0`,
expected: `fe:::[[foo="bar"]]:3600000:0`,
},
{
name: "label names, multiple matchers",
Expand All @@ -114,25 +112,28 @@ func TestGenerateCacheKey(t *testing.T) {
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")},
{labels.MustNewMatcher(labels.MatchEqual, "baz", "qux")},
},
SplitInterval: time.Hour,
},
expected: `fe:::[[foo="bar"] [baz="qux"]]:0`,
expected: `fe:::[[foo="bar"] [baz="qux"]]:3600000:0`,
},
{
name: "label values, no matcher",
req: &ThanosLabelsRequest{
Start: 0,
Label: "up",
Start: 0,
Label: "up",
SplitInterval: time.Hour,
},
expected: "fe::up:[]:0",
expected: "fe::up:[]:3600000:0",
},
{
name: "label values, single matcher",
req: &ThanosLabelsRequest{
Start: 0,
Label: "up",
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
Start: 0,
Label: "up",
Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
SplitInterval: time.Hour,
},
expected: `fe::up:[[foo="bar"]]:0`,
expected: `fe::up:[[foo="bar"]]:3600000:0`,
},
{
name: "label values, multiple matchers",
Expand All @@ -143,8 +144,9 @@ func TestGenerateCacheKey(t *testing.T) {
{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")},
{labels.MustNewMatcher(labels.MatchEqual, "baz", "qux")},
},
SplitInterval: time.Hour,
},
expected: `fe::up:[[foo="bar"] [baz="qux"]]:0`,
expected: `fe::up:[[foo="bar"] [baz="qux"]]:3600000:0`,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -153,3 +155,23 @@ func TestGenerateCacheKey(t *testing.T) {
})
}
}

func TestGenerateCacheKey_UnsupportedRequest(t *testing.T) {
splitter := newThanosCacheKeyGenerator()

req := &queryrange.PrometheusRequest{
Query: "up",
Start: 0,
Step: 60 * seconds,
}

defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic")
} else {
testutil.Assert(t, r == "request type not supported", "unexpected panic: %v", r)
}
}()

splitter.GenerateCacheKey("", req)
}
36 changes: 36 additions & 0 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ type ShardedRequest interface {
WithShardInfo(info *storepb.ShardInfo) queryrange.Request
}

// SplitRequest interface represents a query request that can be split horizontally.
type SplitRequest interface {
GetSplitInterval() time.Duration
WithSplitInterval(interval time.Duration) queryrange.Request
}

type RequestHeader struct {
Name string
Values []string
Expand Down Expand Up @@ -57,6 +63,7 @@ type ThanosQueryRangeRequest struct {
LookbackDelta int64
Analyze bool
Engine string
SplitInterval time.Duration
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand All @@ -83,6 +90,8 @@ func (r *ThanosQueryRangeRequest) GetCachingOptions() queryrange.CachingOptions

func (r *ThanosQueryRangeRequest) GetStats() string { return r.Stats }

func (r *ThanosQueryRangeRequest) GetSplitInterval() time.Duration { return r.SplitInterval }

func (r *ThanosQueryRangeRequest) WithStats(stats string) queryrange.Request {
q := *r
q.Stats = stats
Expand Down Expand Up @@ -111,6 +120,13 @@ func (r *ThanosQueryRangeRequest) WithShardInfo(info *storepb.ShardInfo) queryra
return &q
}

// WithSplitInterval clones the current request with a different split interval.
func (r *ThanosQueryRangeRequest) WithSplitInterval(interval time.Duration) queryrange.Request {
q := *r
q.SplitInterval = interval
return &q
}

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosQueryRangeRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
Expand Down Expand Up @@ -246,6 +262,7 @@ type ThanosLabelsRequest struct {
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
Stats string
SplitInterval time.Duration
}

// GetStoreMatchers returns store matches.
Expand All @@ -268,6 +285,8 @@ func (r *ThanosLabelsRequest) GetCachingOptions() queryrange.CachingOptions { re

func (r *ThanosLabelsRequest) GetStats() string { return r.Stats }

func (r *ThanosLabelsRequest) GetSplitInterval() time.Duration { return r.SplitInterval }

func (r *ThanosLabelsRequest) WithStats(stats string) queryrange.Request {
q := *r
q.Stats = stats
Expand All @@ -288,6 +307,13 @@ func (r *ThanosLabelsRequest) WithQuery(_ string) queryrange.Request {
return &q
}

// WithSplitInterval clones the current request with a different split interval.
func (r *ThanosLabelsRequest) WithSplitInterval(interval time.Duration) queryrange.Request {
q := *r
q.SplitInterval = interval
return &q
}

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosLabelsRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
Expand Down Expand Up @@ -328,6 +354,7 @@ type ThanosSeriesRequest struct {
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
Stats string
SplitInterval time.Duration
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand All @@ -353,6 +380,8 @@ func (r *ThanosSeriesRequest) GetCachingOptions() queryrange.CachingOptions { re

func (r *ThanosSeriesRequest) GetStats() string { return r.Stats }

func (r *ThanosSeriesRequest) GetSplitInterval() time.Duration { return r.SplitInterval }

func (r *ThanosSeriesRequest) WithStats(stats string) queryrange.Request {
q := *r
q.Stats = stats
Expand All @@ -373,6 +402,13 @@ func (r *ThanosSeriesRequest) WithQuery(_ string) queryrange.Request {
return &q
}

// WithSplitInterval clones the current request with a different split interval.
func (r *ThanosSeriesRequest) WithSplitInterval(interval time.Duration) queryrange.Request {
q := *r
q.SplitInterval = interval
return &q
}

// LogToSpan writes information about this request to an OpenTracing span.
func (r *ThanosSeriesRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
Expand Down
5 changes: 2 additions & 3 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func newQueryRangeTripperware(
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(dynamicIntervalFn(config)),
newThanosCacheKeyGenerator(),
limits,
codec,
queryrange.PrometheusResponseExtractor{},
Expand Down Expand Up @@ -299,11 +299,10 @@ func newLabelsTripperware(
}

if config.ResultsCacheConfig != nil {
staticIntervalFn := func(_ queryrange.Request) time.Duration { return config.SplitQueriesByInterval }
queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware(
logger,
*config.ResultsCacheConfig,
newThanosCacheKeyGenerator(staticIntervalFn),
newThanosCacheKeyGenerator(),
limits,
codec,
ThanosResponseExtractor{},
Expand Down
Loading

0 comments on commit ebfc03e

Please sign in to comment.