Skip to content

Commit

Permalink
MQE: Add quantile aggregation (#10755)
Browse files Browse the repository at this point in the history
* MQE: Add quantile aggregation

* Address review nits

* Add extra test case for changing scalar

* Extend test to have no values and emits no warnings for bad params

* Avoid recreating param annotation func

* Add benchmark

* Fix test file

* Remove quantile from unsupported function list

* Update benchmark query

* Refactor to avoid duplicate checks of the param

* Extend tests

* Address review feedback
  • Loading branch information
jhesketh authored Mar 7, 2025
1 parent b10d3fb commit cccd3cb
Show file tree
Hide file tree
Showing 17 changed files with 390 additions and 66 deletions.
6 changes: 6 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "topk by (le) (5, h_X)",
},
{
Expr: "quantile(0.9, a_X)",
},
{
Expr: "quantile by (le) (0.1, h_X)",
},
// Combinations.
{
Expr: "rate(a_X[1m]) + rate(b_X[1m])",
Expand Down
30 changes: 27 additions & 3 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
"absent_over_time(nonexistent{}[1h])": "'absent_over_time' function",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -2057,6 +2057,11 @@ func runAnnotationTests(t *testing.T, testCases map[string]annotationTestCase) {
}

func TestAnnotations(t *testing.T) {
floatData := `
metric{type="float", series="1"} 0+1x3
metric{type="float", series="2"} 1+1x3
`

mixedFloatHistogramData := `
metric{type="float", series="1"} 0+1x3
metric{type="float", series="2"} 1+1x3
Expand All @@ -2070,14 +2075,14 @@ func TestAnnotations(t *testing.T) {
metric{series="custom-buckets-2"} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}}+{{schema:-53 sum:5 count:4 custom_values:[2 3] buckets:[1 2 1]}}x3
metric{series="mixed-exponential-custom-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}
metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}}
`
`

nativeHistogramsWithResetHintsMix := `
metric{reset_hint="unknown"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{reset_hint="gauge"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3
metric{reset_hint="gauge-unknown"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{reset_hint="unknown-gauge"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3
`
`

testCases := map[string]annotationTestCase{
"sum() with float and native histogram at same step": {
Expand Down Expand Up @@ -2344,6 +2349,21 @@ func TestAnnotations(t *testing.T) {
`PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "other_float_metric" (1:105)`,
},
},

"quantile with mixed histograms": {
data: mixedFloatHistogramData,
expr: "quantile(0.9, metric)",
expectedInfoAnnotations: []string{
`PromQL info: ignored histogram in quantile aggregation (1:15)`,
},
},
"quantile with invalid param": {
data: floatData,
expr: "quantile(1.5, metric)",
expectedWarningAnnotations: []string{
`PromQL warning: quantile value should be between 0 and 1, got 1.5 (1:10)`,
},
},
}

for _, f := range []string{"min_over_time", "max_over_time", "stddev_over_time", "stdvar_over_time"} {
Expand Down Expand Up @@ -2996,6 +3016,10 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) {
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
}
// NOTE(jhesketh): We do not test a changing quantile factor here as prometheus currently
// does not support it (https://github.com/prometheus/prometheus/issues/15971)
expressions = append(expressions, fmt.Sprintf(`quantile (0.9, series{label=~"(%s)"})`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`quantile by (group) (0.9, series{label=~"(%s)"})`, labelRegex))

expressions = append(expressions, fmt.Sprintf(`count_values("value", series{label="%s"})`, labelRegex))
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/streamingpromql/operators/aggregations/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type Aggregation struct {
remainingGroups []*group // One entry per group, in the order we want to return them

haveEmittedMixedFloatsAndHistogramsWarning bool

// If the aggregation has a parameter, its values are expected
// to be filled here by the wrapping operator.
// Currently only used by the quantile aggregation.
ParamData types.ScalarData
}

func NewAggregation(
Expand Down Expand Up @@ -218,7 +223,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries
}

// Construct the group and return it
seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.TimeRange, a.MemoryConsumptionTracker)
seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.ParamData, a.TimeRange, a.MemoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand Down Expand Up @@ -248,7 +253,7 @@ func (a *Aggregation) accumulateUntilGroupComplete(ctx context.Context, g *group

thisSeriesGroup := a.remainingInnerSeriesToGroup[0]
a.remainingInnerSeriesToGroup = a.remainingInnerSeriesToGroup[1:]
if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.TimeRange, a.MemoryConsumptionTracker, a.emitAnnotationFunc); err != nil {
if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.TimeRange, a.MemoryConsumptionTracker, a.emitAnnotationFunc, thisSeriesGroup.remainingSeriesCount); err != nil {
return err
}
thisSeriesGroup.remainingSeriesCount--
Expand All @@ -264,6 +269,8 @@ func (a *Aggregation) emitAnnotation(generator types.AnnotationGenerator) {
}

func (a *Aggregation) Close() {
// The wrapping operator is responsible for returning any a.ParamData slice
// since it is responsible for setting them up.
a.Inner.Close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h4})
series := types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil, 1))
require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")

// Second series: all histograms that are not retained should be nil-ed out after returning.
Expand All @@ -62,7 +62,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h9})
series = types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil, 1))

expected := []promql.HPoint{
{T: 0, H: h5}, // h5 not retained (added to h1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type AvgAggregationGroup struct {
groupSeriesCounts []float64
}

func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc, _ uint) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
Expand Down Expand Up @@ -256,7 +256,7 @@ func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) {
return floatPointCount, haveMixedFloatsAndHistograms
}

func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *AvgAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints()
var floatPoints []promql.FPoint
var err error
Expand Down
22 changes: 12 additions & 10 deletions pkg/streamingpromql/operators/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ import (
// AggregationGroup accumulates series that have been grouped together and computes the output series data.
type AggregationGroup interface {
// AccumulateSeries takes in a series as part of the group
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error
// remainingSeriesInGroup includes the current series (ie if data is the last series, then remainingSeriesInGroup is 1)
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc, remainingSeriesInGroup uint) error
// ComputeOutputSeries does any final calculations and returns the grouped series data
ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
ComputeOutputSeries(param types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
}

type AggregationGroupFactory func() AggregationGroup

var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.COUNT: func() AggregationGroup { return NewCountGroupAggregationGroup(true) },
parser.GROUP: func() AggregationGroup { return NewCountGroupAggregationGroup(false) },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.STDDEV: func() AggregationGroup { return NewStddevStdvarAggregationGroup(true) },
parser.STDVAR: func() AggregationGroup { return NewStddevStdvarAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} },
parser.COUNT: func() AggregationGroup { return NewCountGroupAggregationGroup(true) },
parser.GROUP: func() AggregationGroup { return NewCountGroupAggregationGroup(false) },
parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) },
parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) },
parser.QUANTILE: func() AggregationGroup { return &QuantileAggregationGroup{} },
parser.STDDEV: func() AggregationGroup { return NewStddevStdvarAggregationGroup(true) },
parser.STDVAR: func() AggregationGroup { return NewStddevStdvarAggregationGroup(false) },
parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} },
}

// Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored.
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/aggregations/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (g *CountGroupAggregationGroup) groupAccumulatePoint(idx int64) {
g.values[idx] = 1
}

func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ types.EmitAnnotationFunc) error {
func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ types.EmitAnnotationFunc, _ uint) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.values == nil {
var err error
// First series with values for this group, populate it.
Expand All @@ -64,7 +64,7 @@ func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSe
return nil
}

func (g *CountGroupAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *CountGroupAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, fv := range g.values {
if fv > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/operators/aggregations/min_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {
}
}

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotation types.EmitAnnotationFunc) error {
func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotation types.EmitAnnotationFunc, _ uint) error {
// Native histograms are ignored for min and max.
if len(data.Histograms) > 0 {
emitAnnotation(func(_ string, expressionPosition posrange.PositionRange) error {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeries
return nil
}

func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *MinMaxAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, p := range g.floatPresent {
if p {
Expand Down
Loading

0 comments on commit cccd3cb

Please sign in to comment.