diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 31c066852c..35d4c829b2 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -275,6 +275,12 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "topk by (le) (5, h_X)", }, + { + Expr: "quantile(0.9, a_X)", + }, + { + Expr: "quantile by (le) (0.1, h_X)", + }, // Combinations. { Expr: "rate(a_X[1m]) + rate(b_X[1m])", diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 6c2a6171b1..41ddf89d8b 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -50,7 +50,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // The goal of this is not to list every conceivable expression that is unsupported, but to cover all the // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ - "quantile(0.95, metric{})": "'quantile' aggregation with parameter", + "absent_over_time(nonexistent{}[1h])": "'absent_over_time' function", } for expression, expectedError := range unsupportedExpressions { @@ -2057,6 +2057,11 @@ func runAnnotationTests(t *testing.T, testCases map[string]annotationTestCase) { } func TestAnnotations(t *testing.T) { + floatData := ` + metric{type="float", series="1"} 0+1x3 + metric{type="float", series="2"} 1+1x3 + ` + mixedFloatHistogramData := ` metric{type="float", series="1"} 0+1x3 metric{type="float", series="2"} 1+1x3 @@ -2070,14 +2075,14 @@ func TestAnnotations(t *testing.T) { metric{series="custom-buckets-2"} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}}+{{schema:-53 sum:5 count:4 custom_values:[2 3] buckets:[1 2 1]}}x3 metric{series="mixed-exponential-custom-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}} metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}} - ` + ` nativeHistogramsWithResetHintsMix := ` metric{reset_hint="unknown"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3 metric{reset_hint="gauge"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3 metric{reset_hint="gauge-unknown"} {{schema:0 sum:0 count:0 counter_reset_hint:gauge}} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3 metric{reset_hint="unknown-gauge"} {{schema:0 sum:0 count:0}}+{{schema:0 sum:5 count:4 buckets:[1 2 1] counter_reset_hint:gauge}}x3 - ` + ` testCases := map[string]annotationTestCase{ "sum() with float and native histogram at same step": { @@ -2344,6 +2349,21 @@ func TestAnnotations(t *testing.T) { `PromQL info: metric might not be a counter, name does not end in _total/_sum/_count/_bucket: "other_float_metric" (1:105)`, }, }, + + "quantile with mixed histograms": { + data: mixedFloatHistogramData, + expr: "quantile(0.9, metric)", + expectedInfoAnnotations: []string{ + `PromQL info: ignored histogram in quantile aggregation (1:15)`, + }, + }, + "quantile with invalid param": { + data: floatData, + expr: "quantile(1.5, metric)", + expectedWarningAnnotations: []string{ + `PromQL warning: quantile value should be between 0 and 1, got 1.5 (1:10)`, + }, + }, } for _, f := range []string{"min_over_time", "max_over_time", "stddev_over_time", "stdvar_over_time"} { @@ -2996,6 +3016,10 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) { expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex)) } + // NOTE(jhesketh): We do not test a changing quantile factor here as prometheus currently + // does not support it (https://github.com/prometheus/prometheus/issues/15971) + expressions = append(expressions, fmt.Sprintf(`quantile (0.9, series{label=~"(%s)"})`, labelRegex)) + expressions = append(expressions, fmt.Sprintf(`quantile by (group) (0.9, series{label=~"(%s)"})`, labelRegex)) expressions = append(expressions, fmt.Sprintf(`count_values("value", series{label="%s"})`, labelRegex)) } diff --git a/pkg/streamingpromql/operators/aggregations/aggregation.go b/pkg/streamingpromql/operators/aggregations/aggregation.go index 2bf902c148..e6970be27b 100644 --- a/pkg/streamingpromql/operators/aggregations/aggregation.go +++ b/pkg/streamingpromql/operators/aggregations/aggregation.go @@ -45,6 +45,11 @@ type Aggregation struct { remainingGroups []*group // One entry per group, in the order we want to return them haveEmittedMixedFloatsAndHistogramsWarning bool + + // If the aggregation has a parameter, its values are expected + // to be filled here by the wrapping operator. + // Currently only used by the quantile aggregation. + ParamData types.ScalarData } func NewAggregation( @@ -218,7 +223,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries } // Construct the group and return it - seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.TimeRange, a.MemoryConsumptionTracker) + seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.ParamData, a.TimeRange, a.MemoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err } @@ -248,7 +253,7 @@ func (a *Aggregation) accumulateUntilGroupComplete(ctx context.Context, g *group thisSeriesGroup := a.remainingInnerSeriesToGroup[0] a.remainingInnerSeriesToGroup = a.remainingInnerSeriesToGroup[1:] - if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.TimeRange, a.MemoryConsumptionTracker, a.emitAnnotationFunc); err != nil { + if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.TimeRange, a.MemoryConsumptionTracker, a.emitAnnotationFunc, thisSeriesGroup.remainingSeriesCount); err != nil { return err } thisSeriesGroup.remainingSeriesCount-- @@ -264,6 +269,8 @@ func (a *Aggregation) emitAnnotation(generator types.AnnotationGenerator) { } func (a *Aggregation) Close() { + // The wrapping operator is responsible for returning any a.ParamData slice + // since it is responsible for setting them up. a.Inner.Close() } diff --git a/pkg/streamingpromql/operators/aggregations/aggregations_safety_test.go b/pkg/streamingpromql/operators/aggregations/aggregations_safety_test.go index 2dd25012c8..fc78b01bc8 100644 --- a/pkg/streamingpromql/operators/aggregations/aggregations_safety_test.go +++ b/pkg/streamingpromql/operators/aggregations/aggregations_safety_test.go @@ -44,7 +44,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { histograms = append(histograms, promql.HPoint{T: 4, H: h4}) series := types.InstantVectorSeriesData{Histograms: histograms} - require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil)) + require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil, 1)) require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series") // Second series: all histograms that are not retained should be nil-ed out after returning. @@ -62,7 +62,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { histograms = append(histograms, promql.HPoint{T: 4, H: h9}) series = types.InstantVectorSeriesData{Histograms: histograms} - require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil)) + require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil, 1)) expected := []promql.HPoint{ {T: 0, H: h5}, // h5 not retained (added to h1) diff --git a/pkg/streamingpromql/operators/aggregations/avg.go b/pkg/streamingpromql/operators/aggregations/avg.go index a1e783d617..1c9f31c386 100644 --- a/pkg/streamingpromql/operators/aggregations/avg.go +++ b/pkg/streamingpromql/operators/aggregations/avg.go @@ -33,7 +33,7 @@ type AvgAggregationGroup struct { groupSeriesCounts []float64 } -func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error { +func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc, _ uint) error { defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker) if len(data.Floats) == 0 && len(data.Histograms) == 0 { // Nothing to do @@ -256,7 +256,7 @@ func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) { return floatPointCount, haveMixedFloatsAndHistograms } -func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *AvgAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints() var floatPoints []promql.FPoint var err error diff --git a/pkg/streamingpromql/operators/aggregations/common.go b/pkg/streamingpromql/operators/aggregations/common.go index 297a0421fb..fc1c7aa17d 100644 --- a/pkg/streamingpromql/operators/aggregations/common.go +++ b/pkg/streamingpromql/operators/aggregations/common.go @@ -16,22 +16,24 @@ import ( // AggregationGroup accumulates series that have been grouped together and computes the output series data. type AggregationGroup interface { // AccumulateSeries takes in a series as part of the group - AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error + // remainingSeriesInGroup includes the current series (ie if data is the last series, then remainingSeriesInGroup is 1) + AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc, remainingSeriesInGroup uint) error // ComputeOutputSeries does any final calculations and returns the grouped series data - ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) + ComputeOutputSeries(param types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) } type AggregationGroupFactory func() AggregationGroup var AggregationGroupFactories = map[parser.ItemType]AggregationGroupFactory{ - parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} }, - parser.COUNT: func() AggregationGroup { return NewCountGroupAggregationGroup(true) }, - parser.GROUP: func() AggregationGroup { return NewCountGroupAggregationGroup(false) }, - parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) }, - parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) }, - parser.STDDEV: func() AggregationGroup { return NewStddevStdvarAggregationGroup(true) }, - parser.STDVAR: func() AggregationGroup { return NewStddevStdvarAggregationGroup(false) }, - parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} }, + parser.AVG: func() AggregationGroup { return &AvgAggregationGroup{} }, + parser.COUNT: func() AggregationGroup { return NewCountGroupAggregationGroup(true) }, + parser.GROUP: func() AggregationGroup { return NewCountGroupAggregationGroup(false) }, + parser.MAX: func() AggregationGroup { return NewMinMaxAggregationGroup(true) }, + parser.MIN: func() AggregationGroup { return NewMinMaxAggregationGroup(false) }, + parser.QUANTILE: func() AggregationGroup { return &QuantileAggregationGroup{} }, + parser.STDDEV: func() AggregationGroup { return NewStddevStdvarAggregationGroup(true) }, + parser.STDVAR: func() AggregationGroup { return NewStddevStdvarAggregationGroup(false) }, + parser.SUM: func() AggregationGroup { return &SumAggregationGroup{} }, } // Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored. diff --git a/pkg/streamingpromql/operators/aggregations/count.go b/pkg/streamingpromql/operators/aggregations/count.go index 64b6d92d0d..d78bf8ac31 100644 --- a/pkg/streamingpromql/operators/aggregations/count.go +++ b/pkg/streamingpromql/operators/aggregations/count.go @@ -37,7 +37,7 @@ func (g *CountGroupAggregationGroup) groupAccumulatePoint(idx int64) { g.values[idx] = 1 } -func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ types.EmitAnnotationFunc) error { +func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ types.EmitAnnotationFunc, _ uint) error { if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.values == nil { var err error // First series with values for this group, populate it. @@ -64,7 +64,7 @@ func (g *CountGroupAggregationGroup) AccumulateSeries(data types.InstantVectorSe return nil } -func (g *CountGroupAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *CountGroupAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount := 0 for _, fv := range g.values { if fv > 0 { diff --git a/pkg/streamingpromql/operators/aggregations/min_max.go b/pkg/streamingpromql/operators/aggregations/min_max.go index 53d893e7f4..4a3eafdc93 100644 --- a/pkg/streamingpromql/operators/aggregations/min_max.go +++ b/pkg/streamingpromql/operators/aggregations/min_max.go @@ -51,7 +51,7 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) { } } -func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotation types.EmitAnnotationFunc) error { +func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotation types.EmitAnnotationFunc, _ uint) error { // Native histograms are ignored for min and max. if len(data.Histograms) > 0 { emitAnnotation(func(_ string, expressionPosition posrange.PositionRange) error { @@ -90,7 +90,7 @@ func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeries return nil } -func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *MinMaxAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount := 0 for _, p := range g.floatPresent { if p { diff --git a/pkg/streamingpromql/operators/aggregations/quantile.go b/pkg/streamingpromql/operators/aggregations/quantile.go new file mode 100644 index 0000000000..a97541f28d --- /dev/null +++ b/pkg/streamingpromql/operators/aggregations/quantile.go @@ -0,0 +1,180 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package aggregations + +import ( + "context" + "math" + "unsafe" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" + "github.com/prometheus/prometheus/util/annotations" + + "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/operators/functions" + "github.com/grafana/mimir/pkg/streamingpromql/types" + "github.com/grafana/mimir/pkg/util/pool" +) + +// QuantileAggregation is a small wrapper around Aggregation to pre-process and validate +// the quantile parameter and fill it into Aggregation.ParamData +type QuantileAggregation struct { + Param types.ScalarOperator + Aggregation *Aggregation + MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + Annotations *annotations.Annotations +} + +func NewQuantileAggregation( + inner types.InstantVectorOperator, + param types.ScalarOperator, + timeRange types.QueryTimeRange, + grouping []string, + without bool, + memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + annotations *annotations.Annotations, + expressionPosition posrange.PositionRange, +) (*QuantileAggregation, error) { + + a, err := NewAggregation( + inner, + timeRange, + grouping, + without, + parser.QUANTILE, + memoryConsumptionTracker, + annotations, + expressionPosition, + ) + if err != nil { + return nil, err + } + + q := &QuantileAggregation{ + Aggregation: a, + Param: param, + MemoryConsumptionTracker: memoryConsumptionTracker, + Annotations: annotations, + } + + return q, nil +} + +func (q *QuantileAggregation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { + var err error + q.Aggregation.ParamData, err = q.Param.GetValues(ctx) + if err != nil { + return nil, err + } + // Validate the parameter now so we only have to do it once for each group + for _, p := range q.Aggregation.ParamData.Samples { + if math.IsNaN(p.F) || p.F < 0 || p.F > 1 { + q.Annotations.Add(annotations.NewInvalidQuantileWarning(p.F, q.Param.ExpressionPosition())) + } + } + + return q.Aggregation.SeriesMetadata(ctx) +} + +func (q *QuantileAggregation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { + return q.Aggregation.NextSeries(ctx) +} + +func (q *QuantileAggregation) Close() { + if q.Aggregation.ParamData.Samples != nil { + types.FPointSlicePool.Put(q.Aggregation.ParamData.Samples, q.MemoryConsumptionTracker) + } + if q.Param != nil { + q.Param.Close() + } + q.Aggregation.Close() +} + +func (q *QuantileAggregation) ExpressionPosition() posrange.PositionRange { + return q.Aggregation.ExpressionPosition() +} + +type QuantileAggregationGroup struct { + qGroups []qGroup // A group per point in time +} + +type qGroup struct { + points []float64 // All of the floats for this group of series at a point in time +} + +const maxExpectedQuantileGroups = 64 // There isn't much science to this + +var qGroupPool = types.NewLimitingBucketedPool( + pool.NewBucketedPool(maxExpectedQuantileGroups, func(size int) []qGroup { + return make([]qGroup, 0, size) + }), + uint64(unsafe.Sizeof(qGroup{})), + false, + nil, +) + +func (q *QuantileAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc, remainingSeriesInGroup uint) error { + defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker) + + if len(data.Histograms) > 0 { + emitAnnotationFunc(func(_ string, expressionPosition posrange.PositionRange) error { + return annotations.NewHistogramIgnoredInAggregationInfo("quantile", expressionPosition) + }) + } + + if len(data.Floats) == 0 { + // Nothing to do + return nil + } + + var err error + if q.qGroups == nil { + q.qGroups, err = qGroupPool.Get(timeRange.StepCount, memoryConsumptionTracker) + if err != nil { + return err + } + q.qGroups = q.qGroups[:timeRange.StepCount] + } + + for _, p := range data.Floats { + idx := timeRange.PointIndex(p.T) + + if q.qGroups[idx].points == nil { + q.qGroups[idx].points, err = types.Float64SlicePool.Get(int(remainingSeriesInGroup), memoryConsumptionTracker) + if err != nil { + return err + } + } + q.qGroups[idx].points = append(q.qGroups[idx].points, p.F) + } + + return nil +} + +func (q *QuantileAggregationGroup) ComputeOutputSeries(param types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { + quantilePoints, err := types.FPointSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) + if err != nil { + return types.InstantVectorSeriesData{}, false, err + } + + for i, qGroup := range q.qGroups { + if qGroup.points == nil { + // No series have any points at this time step, so nothing to output + continue + } + p := param.Samples[i].F + t := timeRange.StartT + int64(i)*timeRange.IntervalMilliseconds + f := functions.Quantile(p, qGroup.points) + quantilePoints = append(quantilePoints, promql.FPoint{T: t, F: f}) + types.Float64SlicePool.Put(qGroup.points, memoryConsumptionTracker) + q.qGroups[i].points = nil + } + + qGroupPool.Put(q.qGroups, memoryConsumptionTracker) + return types.InstantVectorSeriesData{Floats: quantilePoints}, false, nil +} diff --git a/pkg/streamingpromql/operators/aggregations/stddev_stdvar.go b/pkg/streamingpromql/operators/aggregations/stddev_stdvar.go index 92308b9569..0f02563554 100644 --- a/pkg/streamingpromql/operators/aggregations/stddev_stdvar.go +++ b/pkg/streamingpromql/operators/aggregations/stddev_stdvar.go @@ -35,7 +35,7 @@ type StddevStdvarAggregationGroup struct { groupSeriesCounts []float64 } -func (g *StddevStdvarAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotation types.EmitAnnotationFunc) error { +func (g *StddevStdvarAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotation types.EmitAnnotationFunc, _ uint) error { // Native histograms are ignored for stddev and stdvar. if len(data.Histograms) > 0 { emitAnnotation(func(_ string, expressionPosition posrange.PositionRange) error { @@ -83,7 +83,7 @@ func (g *StddevStdvarAggregationGroup) AccumulateSeries(data types.InstantVector return nil } -func (g *StddevStdvarAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *StddevStdvarAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount := 0 for _, sc := range g.groupSeriesCounts { if sc > 0 { diff --git a/pkg/streamingpromql/operators/aggregations/sum.go b/pkg/streamingpromql/operators/aggregations/sum.go index d3bea36146..5a2f2d47dc 100644 --- a/pkg/streamingpromql/operators/aggregations/sum.go +++ b/pkg/streamingpromql/operators/aggregations/sum.go @@ -24,7 +24,7 @@ type SumAggregationGroup struct { histogramPointCount int } -func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc) error { +func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc types.EmitAnnotationFunc, _ uint) error { defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker) if len(data.Floats) == 0 && len(data.Histograms) == 0 { // Nothing to do @@ -164,7 +164,7 @@ func (g *SumAggregationGroup) reconcileAndCountFloatPoints() (int, bool) { return floatPointCount, haveMixedFloatsAndHistograms } -func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *SumAggregationGroup) ComputeOutputSeries(_ types.ScalarData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints() var floatPoints []promql.FPoint var err error diff --git a/pkg/streamingpromql/operators/functions/quantile.go b/pkg/streamingpromql/operators/functions/quantile.go index e712f60b7e..33c5e34d70 100644 --- a/pkg/streamingpromql/operators/functions/quantile.go +++ b/pkg/streamingpromql/operators/functions/quantile.go @@ -476,14 +476,14 @@ func ensureMonotonicAndIgnoreSmallDeltas(buckets buckets, tolerance float64) (bo return forcedMonotonic, fixedPrecision } -// quantile calculates the given quantile of a vector of samples. +// Quantile calculates the given quantile of a vector of samples. // // values will be sorted in place. // If values has zero elements, NaN is returned. // If q==NaN, NaN is returned. // If q<0, -Inf is returned. // If q>1, +Inf is returned. -func quantile(q float64, values []float64) float64 { +func Quantile(q float64, values []float64) float64 { if len(values) == 0 || math.IsNaN(q) { return math.NaN() } diff --git a/pkg/streamingpromql/operators/functions/range_vectors.go b/pkg/streamingpromql/operators/functions/range_vectors.go index fc7de80763..41b19acd70 100644 --- a/pkg/streamingpromql/operators/functions/range_vectors.go +++ b/pkg/streamingpromql/operators/functions/range_vectors.go @@ -728,5 +728,5 @@ func quantileOverTime(step *types.RangeVectorStepData, _ float64, args []types.S values = append(values, p.F) } - return quantile(q, values), true, nil, nil + return Quantile(q, values), true, nil, nil } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index f351a4916e..29db14633d 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -192,6 +192,21 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types } return topkbottomk.New(inner, param, timeRange, e.Grouping, e.Without, e.Op == parser.TOPK, q.memoryConsumptionTracker, q.annotations, e.PosRange), nil + case parser.QUANTILE: + param, err := q.convertToScalarOperator(e.Param, timeRange) + if err != nil { + return nil, err + } + return aggregations.NewQuantileAggregation( + inner, + param, + timeRange, + e.Grouping, + e.Without, + q.memoryConsumptionTracker, + q.annotations, + e.PosRange, + ) case parser.COUNT_VALUES: param, err := q.convertToStringOperator(e.Param) if err != nil { diff --git a/pkg/streamingpromql/testdata/ours-only/aggregators.test b/pkg/streamingpromql/testdata/ours-only/aggregators.test index 223bdf5a7e..ee189be2ed 100644 --- a/pkg/streamingpromql/testdata/ours-only/aggregators.test +++ b/pkg/streamingpromql/testdata/ours-only/aggregators.test @@ -11,3 +11,15 @@ eval range from 0 to 30m step 6m topk(scalar(param), series) series{env="prod", instance="1"} _ 4 9 20 _ _ series{env="prod", instance="2"} 2 3 10 _ _ 1 series{env="prod", instance="3"} _ 0 _ _ _ _ + +clear + +# This case currently fails with Prometheus' engine due to https://github.com/prometheus/prometheus/issues/15971. +load 6m + series{env="prod", instance="1"} 1 4 9 20 _ _ _ _ + series{env="prod", instance="2"} 2 3 10 _ _ 1 _ _ + series{env="prod", instance="3"} 0 0 8 _ _ _ _ _ + param 0.5 0.1 0.9 0.1 0.2 0.3 _ Inf + +eval_warn range from 0 to 42m step 6m quantile(scalar(param), series) + {} 1 0.6000000000000001 9.799999999999999 20 _ 1 _ _ \ No newline at end of file diff --git a/pkg/streamingpromql/testdata/ours/aggregators.test b/pkg/streamingpromql/testdata/ours/aggregators.test index 2a265d40ef..8396ff5bfd 100644 --- a/pkg/streamingpromql/testdata/ours/aggregators.test +++ b/pkg/streamingpromql/testdata/ours/aggregators.test @@ -17,6 +17,9 @@ eval range from 0 to 4m step 1m sum(some_metric) eval range from 0 to 4m step 1m avg(some_metric) {} 0 2.5 5 7.5 10 +eval range from 0 to 4m step 1m quantile(0.5, some_metric) + {} 0 2.5 5 7.5 10 + # Range query, aggregating to multiple groups with 'by'. eval range from 0 to 4m step 1m sum by (env) (some_metric) {env="prod"} 0 3 6 9 12 @@ -26,6 +29,10 @@ eval range from 0 to 4m step 1m avg by (env) (some_metric) {env="prod"} 0 1.5 3 4.5 6 {env="test"} 0 3.5 7 10.5 14 +eval range from 0 to 4m step 1m quantile by (env) (0.5, some_metric) + {env="prod"} 0 1.5 3 4.5 6 + {env="test"} 0 3.5 7 10.5 14 + # Range query, aggregating to multiple groups with 'without'. eval range from 0 to 4m step 1m sum without (env) (some_metric) {cluster="eu"} 0 4 8 12 16 @@ -35,6 +42,10 @@ eval range from 0 to 4m step 1m avg without (env) (some_metric) {cluster="eu"} 0 2 4 6 8 {cluster="us"} 0 3 6 9 12 +eval range from 0 to 4m step 1m quantile without (env) (0.5, some_metric) + {cluster="eu"} 0 2 4 6 8 + {cluster="us"} 0 3 6 9 12 + # Range query, aggregating to a single group with 'without'. eval range from 0 to 4m step 1m sum without (env, cluster) (some_metric) {} 0 10 20 30 40 @@ -42,6 +53,9 @@ eval range from 0 to 4m step 1m sum without (env, cluster) (some_metric) eval range from 0 to 4m step 1m avg without (env, cluster) (some_metric) {} 0 2.5 5 7.5 10 +eval range from 0 to 4m step 1m quantile without (env, cluster) (0.5, some_metric) + {} 0 2.5 5 7.5 10 + # 'without' should always ignore the metric name. eval range from 0 to 4m step 1m sum without(cluster) ({cluster="us"}) {env="prod"} 0 2 4 6 8 @@ -51,6 +65,10 @@ eval range from 0 to 4m step 1m avg without(cluster) ({cluster="us"}) {env="prod"} 0 2 4 6 8 {env="test"} 0 4.5 9 13.5 18 +eval range from 0 to 4m step 1m quantile without(cluster) (0.5, {cluster="us"}) + {env="prod"} 0 2 4 6 8 + {env="test"} 0 4.5 9 13.5 18 + # If no series are matched, we shouldn't return any results. eval range from 0 to 4m step 1m sum(some_nonexistent_metric) # Should return no results. @@ -80,6 +98,13 @@ eval range from 0 to 4m step 1m avg without(env) (some_metric) {cluster="us", group="a", subgroup="1"} 0 4 8 12 16 {cluster="us", group="a", subgroup="2"} 0 5.5 11 16.5 22 +eval range from 0 to 4m step 1m quantile without(env) (0.5, some_metric) + {cluster="eu", group="a", subgroup="1"} 0 1 2 3 4 + {cluster="eu", group="a", subgroup="2"} 0 2 4 6 8 + {cluster="eu", group="b", subgroup="1"} 0 3 6 9 12 + {cluster="us", group="a", subgroup="1"} 0 4 8 12 16 + {cluster="us", group="a", subgroup="2"} 0 5.5 11 16.5 22 + eval range from 0 to 4m step 1m sum without(env, cluster) (some_metric) {group="a", subgroup="1"} 0 5 10 15 20 {group="a", subgroup="2"} 0 13 26 39 52 @@ -90,6 +115,11 @@ eval range from 0 to 4m step 1m avg without(env, cluster) (some_metric) {group="a", subgroup="2"} 0 4.333333333333333 8.666666666666666 13 17.333333333333332 {group="b", subgroup="1"} 0 3 6 9 12 +eval range from 0 to 4m step 1m quantile without(env, cluster) (0.5, some_metric) + {group="a", subgroup="1"} 0 2.5 5 7.5 10 + {group="a", subgroup="2"} 0 5 10 15 20 + {group="b", subgroup="1"} 0 3 6 9 12 + # 'without' with duplicate labels to remove. eval range from 0 to 4m step 1m sum without(env, cluster, env) (some_metric) {group="a", subgroup="1"} 0 5 10 15 20 @@ -101,6 +131,11 @@ eval range from 0 to 4m step 1m avg without(env, cluster, env) (some_metric) {group="a", subgroup="2"} 0 4.333333333333333 8.666666666666666 13 17.333333333333332 {group="b", subgroup="1"} 0 3 6 9 12 +eval range from 0 to 4m step 1m quantile without(env, cluster, env) (0.5, some_metric) + {group="a", subgroup="1"} 0 2.5 5 7.5 10 + {group="a", subgroup="2"} 0 5 10 15 20 + {group="b", subgroup="1"} 0 3 6 9 12 + # 'by' with duplicate grouping labels. eval range from 0 to 4m step 1m sum by(env, cluster, env) (some_metric) {cluster="eu", env="prod"} 0 6 12 18 24 @@ -112,6 +147,11 @@ eval range from 0 to 4m step 1m avg by(env, cluster, env) (some_metric) {cluster="us", env="prod"} 0 4.5 9 13.5 18 {cluster="us", env="test"} 0 6 12 18 24 +eval range from 0 to 4m step 1m quantile by(env, cluster, env) (0.5, some_metric) + {cluster="eu", env="prod"} 0 2 4 6 8 + {cluster="us", env="prod"} 0 4.5 9 13.5 18 + {cluster="us", env="test"} 0 6 12 18 24 + clear load 1m @@ -124,6 +164,9 @@ eval range from 1m to 1m30s step 1s sum(some_metric_with_staleness) eval range from 1m to 1m30s step 1s avg(some_metric_with_staleness) # Should return no results. +eval range from 1m to 1m30s step 1s quantile(0.5, some_metric_with_staleness) + # Should return no results. + clear # Test native histogram aggregations @@ -138,6 +181,8 @@ eval instant at 0m sum(single_histogram) eval instant at 0m avg(single_histogram) {} {{schema:0 sum:5.666666666666667 count:6 buckets:[1.6666666666666667 4 0.33333333333333337]}} +eval_info instant at 0m quantile(0.5, single_histogram) + eval instant at 0m sum by (label) (single_histogram) {label="value"} {{schema:0 count:4 sum:2 buckets:[1 2 1]}} {label="value2"} {{schema:1 count:14 sum:15 buckets:[4 6 4]}} @@ -171,20 +216,24 @@ load 1m # See: https://github.com/prometheus/prometheus/issues/14172 # What I would expect -# eval range from 0 to 4m step 1m sum by (label) (single_histogram) +# eval range from 0 to 4m step 1m sum by (label) (single_histogram) # {label="value"} 0 1 {{count:4 sum:2 buckets:[1 2 1]}} {{sum:2 count:4 buckets:[1 2 1]}} 2 # {label="value2"} 0 5 {{schema:1 count:14 sum:15 buckets:[4 6 4]}} {{schema:2 count:8 sum:4 buckets:[4 6 4]}} 10 # -# eval range from 0 to 4m step 1m avg by (label) (single_histogram) +# eval range from 0 to 4m step 1m avg by (label) (single_histogram) # {label="value"} 0 1 {{count:4 sum:2 buckets:[1 2 1]}} {{sum:2 count:4 buckets:[1 2 1]}} 2 # {label="value2"} 0 2.5 {{schema:1 count:7 sum:7.5 buckets:[2 3 2]}} {{schema:2 count:4 sum:2 buckets:[1 2 1]}} 5 # What both engines return -eval range from 0 to 4m step 1m sum by (label) (single_histogram) +eval range from 0 to 4m step 1m sum by (label) (single_histogram) {label="value"} 0 1 1 1 2 {label="value2"} 0 5 5 5 10 -eval range from 0 to 4m step 1m avg by (label) (single_histogram) +eval range from 0 to 4m step 1m avg by (label) (single_histogram) + {label="value"} 0 1 1 1 2 + {label="value2"} 0 2.5 2.5 2.5 5 + +eval range from 0 to 4m step 1m quantile by (label) (0.5, single_histogram) {label="value"} 0 1 1 1 2 {label="value2"} 0 2.5 2.5 2.5 5 @@ -197,12 +246,15 @@ load 1m # If a float is present, the histogram is ignored. # If a float comes after a histogram, a lookback'd float is used instead of the histogram (see: https://github.com/prometheus/prometheus/issues/14172) -eval range from 0 to 5m step 1m sum(single_histogram) +eval range from 0 to 5m step 1m sum(single_histogram) {} 0 1 1 5 8 {{sum:4 count:8 buckets:[2 4 2]}} -eval range from 0 to 5m step 1m avg(single_histogram) +eval range from 0 to 5m step 1m avg(single_histogram) {} 0 0.5 0.5 2.5 4 {{sum:2 count:4 buckets:[1 2 1]}} +eval_info range from 0 to 5m step 1m quantile(0.5, single_histogram) + {} 0 0.5 0.5 2.5 4 + clear # Test a mix of float and histogram values at the same point @@ -215,6 +267,9 @@ eval_warn instant at 1m sum(single_histogram) eval_warn instant at 1m avg(single_histogram) +eval_info instant at 1m quantile(0.5, single_histogram) + {} 3 + clear # Test a mix of float and histogram values at the same point @@ -227,6 +282,9 @@ eval_warn instant at 1m sum(single_histogram) eval_warn instant at 1m avg(single_histogram) +eval_info instant at 1m quantile(0.5, single_histogram) + {} 3 + clear # Test a mix of float and histogram values at the same point, where after adding 2 conflicting series and removing a point, @@ -241,6 +299,9 @@ eval_warn instant at 1m sum(single_histogram) eval_warn instant at 1m avg(single_histogram) +eval_info instant at 1m quantile(0.5, single_histogram) + {} 2 + clear # Test min/max aggregation with histograms and a mix of histogram+float values @@ -625,3 +686,26 @@ eval range from 0 to 12m step 6m count_values by (idx) ("idx", series) # Once that fix is vendored into Mimir, we can remove the two (\\")? groups below. eval_fail instant at 0 count_values("a\xc5z", series) expected_fail_regexp invalid label name "(\\")?a\\(\\)?xc5z(\\")?"( for count_values)? + +clear + +load 6m + series{idx="1"} 1 2 3 4 5 6 + series{idx="2"} 4 5 6 7 8 9 + series{idx="3"} 7 8 Inf NaN -Inf + +# Quantile value warning is emitted even when no series are returned +eval_warn range from 0 to 12m step 6m quantile(20, noseries) + +eval_warn range from 0 to 12m step 6m quantile(Inf, noseries) + +eval_warn range from 0 to 12m step 6m quantile(-Inf, noseries) + +eval range from 0 to 30m step 6m quantile(0.9, series) + {} 6.4 7.4 +Inf 6.4 7.4 8.7 + +eval range from 0 to 30m step 6m quantile(0, series) + {} 1 2 3 NaN -Inf 6 + +eval range from 0 to 30m step 6m quantile(1, series) + {} 7 8 NaN 7 8 9 diff --git a/pkg/streamingpromql/testdata/upstream/aggregators.test b/pkg/streamingpromql/testdata/upstream/aggregators.test index 5f657a647f..223b6dc8ad 100644 --- a/pkg/streamingpromql/testdata/upstream/aggregators.test +++ b/pkg/streamingpromql/testdata/upstream/aggregators.test @@ -450,42 +450,36 @@ load 10s data_histogram{test="histogram sample", point="c"} {{schema:2 count:4 sum:10 buckets:[1 0 0 0 1 0 0 1 1]}} foo .8 -# Unsupported by streaming engine. -# eval instant at 1m quantile without(point)(0.8, data) -# {test="two samples"} 0.8 -# {test="three samples"} 1.6 -# {test="uneven samples"} 2.8 +eval instant at 1m quantile without(point)(0.8, data) + {test="two samples"} 0.8 + {test="three samples"} 1.6 + {test="uneven samples"} 2.8 # The histogram is ignored here so the result doesn't change but it has an info annotation now. -# Unsupported by streaming engine. -# eval_info instant at 1m quantile without(point)(0.8, {__name__=~"data(_histogram)?"}) -# {test="two samples"} 0.8 -# {test="three samples"} 1.6 -# {test="uneven samples"} 2.8 +eval_info instant at 1m quantile without(point)(0.8, {__name__=~"data(_histogram)?"}) + {test="two samples"} 0.8 + {test="three samples"} 1.6 + {test="uneven samples"} 2.8 # The histogram is ignored here so there is no result but it has an info annotation now. -# Unsupported by streaming engine. -# eval_info instant at 1m quantile(0.8, data_histogram) +eval_info instant at 1m quantile(0.8, data_histogram) # Bug #5276. -# Unsupported by streaming engine. -# eval instant at 1m quantile without(point)(scalar(foo), data) -# {test="two samples"} 0.8 -# {test="three samples"} 1.6 -# {test="uneven samples"} 2.8 - - -# Unsupported by streaming engine. -# eval instant at 1m quantile without(point)((scalar(foo)), data) -# {test="two samples"} 0.8 -# {test="three samples"} 1.6 -# {test="uneven samples"} 2.8 - -# Unsupported by streaming engine. -# eval_warn instant at 1m quantile without(point)(NaN, data) -# {test="two samples"} NaN -# {test="three samples"} NaN -# {test="uneven samples"} NaN +eval instant at 1m quantile without(point)(scalar(foo), data) + {test="two samples"} 0.8 + {test="three samples"} 1.6 + {test="uneven samples"} 2.8 + + +eval instant at 1m quantile without(point)((scalar(foo)), data) + {test="two samples"} 0.8 + {test="three samples"} 1.6 + {test="uneven samples"} 2.8 + +eval_warn instant at 1m quantile without(point)(NaN, data) + {test="two samples"} NaN + {test="three samples"} NaN + {test="uneven samples"} NaN # Tests for group. clear