Skip to content

Commit

Permalink
MQE: add support for quantile_over_time (#10629)
Browse files Browse the repository at this point in the history
* Enable upstream test cases

* Add implementation of `quantile_over_time`

* Add test cases for edge cases not covered by upstream tests

* Remove `mad_over_time` test case

* Address PR feedback: remove duplicate check

* Add to gauntlet

* Address PR feedback: add test with varying quantile over time
  • Loading branch information
charleskorn authored and ying-jeanne committed Feb 20, 2025
1 parent fa71e28 commit 12ae326
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 73 deletions.
3 changes: 3 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "sum_over_time(nh_X[1m])",
},
{
Expr: "quantile_over_time(0.3, a_X[1m])",
},
//{
// Expr: "absent_over_time(a_X[1d])",
//},
Expand Down
49 changes: 46 additions & 3 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -2286,6 +2285,49 @@ func TestAnnotations(t *testing.T) {
},
},

"quantile_over_time() with negative quantile": {
data: `metric 0 1 2 3`,
expr: `quantile_over_time(-1, metric[1m1s])`,
expectedWarningAnnotations: []string{
`PromQL warning: quantile value should be between 0 and 1, got -1 (1:20)`,
},
},
"quantile_over_time() with 0 quantile": {
data: `some_metric 0 1 2 3`,
expr: `quantile_over_time(0, some_metric[1m1s])`,
},
"quantile_over_time() with quantile between 0 and 1": {
data: `some_metric 0 1 2 3`,
expr: `quantile_over_time(0.5, some_metric[1m1s])`,
},
"quantile_over_time() with 1 quantile": {
data: `some_metric 0 1 2 3`,
expr: `quantile_over_time(1, some_metric[1m1s])`,
},
"quantile_over_time() with quantile greater than 1": {
data: `some_metric 0 1 2 3`,
expr: `quantile_over_time(1.2, some_metric[1m1s])`,
expectedWarningAnnotations: []string{
`PromQL warning: quantile value should be between 0 and 1, got 1.2 (1:20)`,
},
},
"quantile_over_time() over series with only floats": {
data: `some_metric 1 2`,
expr: `quantile_over_time(0.2, some_metric[1m1s])`,
},
"quantile_over_time() over series with only histograms": {
data: `some_metric {{count:1}} {{count:2}}`,
expr: `quantile_over_time(0.2, some_metric[1m1s])`,
},
"quantile_over_time() over series with both floats and histograms": {
data: `some_metric 1 {{count:2}}`,
expr: `quantile_over_time(0.2, some_metric[1m1s])`,
expectedInfoAnnotations: []string{
`PromQL info: ignored histograms in a range containing both floats and histograms for metric name "some_metric" (1:20)`,
},
skipComparisonWithPrometheusReason: "Prometheus' engine emits the wrong annotation, see https://github.com/prometheus/prometheus/pull/16018",
},

"multiple annotations from different operators": {
data: `
mixed_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}}
Expand Down Expand Up @@ -2986,6 +3028,7 @@ func TestCompareVariousMixedMetricsVectorSelectors(t *testing.T) {
}

expressions = append(expressions, fmt.Sprintf(`predict_linear(series{label=~"(%s)"}[1m], 30)`, labelRegex))
expressions = append(expressions, fmt.Sprintf(`quantile_over_time(scalar(series{label="i"}), series{label=~"(%s)"}[1m])`, labelRegex))
}

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
Expand Down
30 changes: 30 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,35 @@ func PredictLinearFactory(args []types.Operator, memoryConsumptionTracker *limit
return o, nil
}

func QuantileOverTimeFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
f := functions.QuantileOverTime

if len(args) != 2 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 2 arguments for quantile_over_time, got %v", len(args))
}

arg, ok := args[0].(types.ScalarOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected first argument for quantile_over_time to be a scalar, got %T", args[1])
}

inner, ok := args[1].(types.RangeVectorOperator)
if !ok {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected second argument for quantile_over_time to be a range vector, got %T", args[0])
}

var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, []types.ScalarOperator{arg}, memoryConsumptionTracker, f, annotations, expressionPosition, timeRange)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
}

return o, nil
}

func scalarToInstantVectorOperatorFactory(args []types.Operator, _ *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
Expand Down Expand Up @@ -503,6 +532,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"month": TimeTransformationFunctionOperatorFactory("month", functions.Month),
"predict_linear": PredictLinearFactory,
"present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime),
"quantile_over_time": QuantileOverTimeFactory,
"rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad),
"rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate),
"resets": FunctionOverRangeVectorOperatorFactory("resets", functions.Resets),
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestFunctionDeduplicateAndMerge(t *testing.T) {
"month": `month({__name__=~"float.*"})`,
"predict_linear": `predict_linear({__name__=~"float.*"}[1m], 30)`,
"present_over_time": `present_over_time({__name__=~"float.*"}[1m])`,
"quantile_over_time": `quantile_over_time(0.5, {__name__=~"float.*"}[1m])`,
"rad": `rad({__name__=~"float.*"})`,
"rate": `rate({__name__=~"float.*"}[1m])`,
"resets": `resets({__name__=~"float.*"}[1m])`,
Expand Down
6 changes: 6 additions & 0 deletions pkg/streamingpromql/operators/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type RangeVectorStepFunction func(
scalarArgsData []types.ScalarData,
timeRange types.QueryTimeRange,
emitAnnotation types.EmitAnnotationFunc,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error)

// RangeVectorSeriesValidationFunction is a function that is called after a series is completed for a function over a range vector.
Expand Down Expand Up @@ -115,6 +116,11 @@ type FunctionOverRangeVectorDefinition struct {

// NeedsSeriesNamesForAnnotations indicates that this function uses the names of input series when emitting annotations.
NeedsSeriesNamesForAnnotations bool

// UseFirstArgumentPositionForAnnotations indicates that annotations emitted by this function should use the position of the
// first argument, not the position of the inner expression.
// FIXME: we might need something more flexible in the future (eg. to accommodate other argument positions), but this is good enough for now.
UseFirstArgumentPositionForAnnotations bool
}

type SeriesMetadataFunctionDefinition struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant
return types.InstantVectorSeriesData{}, err
}

f, hasFloat, h, err := m.Func.StepFunc(step, m.rangeSeconds, m.scalarArgsData, m.timeRange, m.emitAnnotationFunc)
f, hasFloat, h, err := m.Func.StepFunc(step, m.rangeSeconds, m.scalarArgsData, m.timeRange, m.emitAnnotationFunc, m.MemoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand Down Expand Up @@ -179,7 +179,13 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant

func (m *FunctionOverRangeVector) emitAnnotation(generator types.AnnotationGenerator) {
metricName := m.metricNames.GetMetricNameForSeries(m.currentSeriesIndex)
m.Annotations.Add(generator(metricName, m.Inner.ExpressionPosition()))
pos := m.Inner.ExpressionPosition()

if m.Func.UseFirstArgumentPositionForAnnotations {
pos = m.ScalarArgs[0].ExpressionPosition()
}

m.Annotations.Add(generator(metricName, pos))
}

func (m *FunctionOverRangeVector) Close() {
Expand Down
31 changes: 31 additions & 0 deletions pkg/streamingpromql/operators/functions/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,34 @@ func ensureMonotonicAndIgnoreSmallDeltas(buckets buckets, tolerance float64) (bo
}
return forcedMonotonic, fixedPrecision
}

// quantile calculates the given quantile of a vector of samples.
//
// values will be sorted in place.
// If values has zero elements, NaN is returned.
// If q==NaN, NaN is returned.
// If q<0, -Inf is returned.
// If q>1, +Inf is returned.
func quantile(q float64, values []float64) float64 {
if len(values) == 0 || math.IsNaN(q) {
return math.NaN()
}
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
slices.Sort(values)

n := float64(len(values))
// When the quantile lies between two samples,
// we use a weighted average of the two samples.
rank := q * (n - 1)

lowerIndex := math.Max(0, math.Floor(rank))
upperIndex := math.Min(n-1, lowerIndex+1)

weight := rank - math.Floor(rank)
return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight
}
68 changes: 56 additions & 12 deletions pkg/streamingpromql/operators/functions/range_vectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/streamingpromql/floats"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

Expand All @@ -21,7 +23,7 @@ var CountOverTime = FunctionOverRangeVectorDefinition{
StepFunc: countOverTime,
}

func countOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func countOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
fPointCount := step.Floats.Count()
hPointCount := step.Histograms.Count()

Expand All @@ -37,7 +39,7 @@ var LastOverTime = FunctionOverRangeVectorDefinition{
StepFunc: lastOverTime,
}

func lastOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func lastOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
lastFloat, floatAvailable := step.Floats.Last()
lastHistogram, histogramAvailable := step.Histograms.Last()

Expand All @@ -58,7 +60,7 @@ var PresentOverTime = FunctionOverRangeVectorDefinition{
StepFunc: presentOverTime,
}

func presentOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func presentOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
if step.Floats.Any() || step.Histograms.Any() {
return 1, true, nil, nil
}
Expand All @@ -72,7 +74,7 @@ var MaxOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func maxOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func maxOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
Expand Down Expand Up @@ -107,7 +109,7 @@ var MinOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func minOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func minOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
Expand Down Expand Up @@ -142,7 +144,7 @@ var SumOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func sumOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func sumOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

Expand Down Expand Up @@ -207,7 +209,7 @@ var AvgOverTime = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func avgOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func avgOverTime(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

Expand Down Expand Up @@ -357,7 +359,7 @@ var Resets = FunctionOverRangeVectorDefinition{
}

func resetsChanges(isReset bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()
hHead, hTail := step.Histograms.UnsafePoints()

Expand Down Expand Up @@ -483,7 +485,7 @@ var Deriv = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func deriv(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func deriv(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()

if step.Floats.Any() && step.Histograms.Any() {
Expand All @@ -505,7 +507,7 @@ var PredictLinear = FunctionOverRangeVectorDefinition{
NeedsSeriesNamesForAnnotations: true,
}

func predictLinear(step *types.RangeVectorStepData, _ float64, args []types.ScalarData, timeRange types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
func predictLinear(step *types.RangeVectorStepData, _ float64, args []types.ScalarData, timeRange types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := step.Floats.UnsafePoints()

if step.Floats.Any() && step.Histograms.Any() {
Expand Down Expand Up @@ -584,7 +586,7 @@ var Idelta = FunctionOverRangeVectorDefinition{
}

func irateIdelta(isRate bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, _ types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
// Histograms are ignored
fHead, fTail := step.Floats.UnsafePoints()

Expand Down Expand Up @@ -648,7 +650,7 @@ var StdvarOverTime = FunctionOverRangeVectorDefinition{
}

func stddevStdvarOverTime(isStdDev bool) RangeVectorStepFunction {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
return func(step *types.RangeVectorStepData, _ float64, _ []types.ScalarData, _ types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, _ *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
head, tail := step.Floats.UnsafePoints()

if len(head) == 0 && len(tail) == 0 {
Expand Down Expand Up @@ -686,3 +688,45 @@ func stddevStdvarOverTime(isStdDev bool) RangeVectorStepFunction {
return result, true, nil, nil
}
}

var QuantileOverTime = FunctionOverRangeVectorDefinition{
SeriesMetadataFunction: DropSeriesName,
StepFunc: quantileOverTime,
NeedsSeriesNamesForAnnotations: true,
UseFirstArgumentPositionForAnnotations: true,
}

func quantileOverTime(step *types.RangeVectorStepData, _ float64, args []types.ScalarData, timeRange types.QueryTimeRange, emitAnnotation types.EmitAnnotationFunc, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (float64, bool, *histogram.FloatHistogram, error) {
if !step.Floats.Any() {
return 0, false, nil, nil
}

if step.Histograms.Any() {
emitAnnotation(annotations.NewHistogramIgnoredInMixedRangeInfo)
}

q := args[0].Samples[timeRange.PointIndex(step.StepT)].F
if math.IsNaN(q) || q < 0 || q > 1 {
emitAnnotation(func(_ string, expressionPosition posrange.PositionRange) error {
return annotations.NewInvalidQuantileWarning(q, expressionPosition)
})
}

head, tail := step.Floats.UnsafePoints()
values, err := types.Float64SlicePool.Get(len(head)+len(tail), memoryConsumptionTracker)
if err != nil {
return 0, false, nil, err
}

defer types.Float64SlicePool.Put(values, memoryConsumptionTracker)

for _, p := range head {
values = append(values, p.F)
}

for _, p := range tail {
values = append(values, p.F)
}

return quantile(q, values), true, nil, nil
}
Loading

0 comments on commit 12ae326

Please sign in to comment.