Skip to content

Commit

Permalink
MQE: add support for count_values (#10744)
Browse files Browse the repository at this point in the history
* Enable upstream tests

* Add to gauntlet

* Remove from unsupported expressions test

* Enable benchmark

* Initial implementation

* Avoid allocations when formatting value label where possible

* Pool `countValuesSeries` instances

* Add additional benchmark

* Address PR feedback: don't export `ToPoints`

* Update comment and move group manipulation to `NewCountValues`
  • Loading branch information
charleskorn authored Mar 3, 2025
1 parent c552abe commit 0394438
Show file tree
Hide file tree
Showing 7 changed files with 573 additions and 39 deletions.
12 changes: 8 additions & 4 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,14 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "avg by (l)(nh_X)",
},
//{
// Expr: "count_values('value', h_X)",
// Steps: 100,
//},
{
Expr: "count_values('value', h_X)", // Every sample has a different value, so this expression will produce X * 100 output series.
Steps: 100,
},
{
Expr: "count_values('value', h_X * 0)", // Every sample has the same value (0), so this expression will produce 1 series.
Steps: 100,
},
{
Expr: "topk(1, a_X)",
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
`count_values("foo", metric{})`: "'count_values' aggregation with parameter",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
"quantile(0.95, metric{})": "'quantile' aggregation with parameter",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -2997,6 +2996,8 @@ func TestCompareVariousMixedMetricsAggregations(t *testing.T) {
expressions = append(expressions, fmt.Sprintf(`%s by (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
expressions = append(expressions, fmt.Sprintf(`%s without (group) (series{label=~"(%s)"})`, aggFunc, labelRegex))
}

expressions = append(expressions, fmt.Sprintf(`count_values("value", series{label="%s"})`, labelRegex))
}

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
Expand Down
248 changes: 248 additions & 0 deletions pkg/streamingpromql/operators/aggregations/count_values.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package aggregations

import (
"context"
"fmt"
"slices"
"strconv"
"sync"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type CountValues struct {
Inner types.InstantVectorOperator
LabelName types.StringOperator
TimeRange types.QueryTimeRange
Grouping []string // If this is a 'without' aggregation, NewCountValues will ensure that this slice contains __name__.
Without bool
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

expressionPosition posrange.PositionRange

resolvedLabelName string

series [][]promql.FPoint

// Reuse instances used to generate series labels rather than recreating them every time.
labelsBuilder *labels.Builder
labelsBytesBuffer []byte
valueBuffer []byte
}

var _ types.InstantVectorOperator = &CountValues{}

func NewCountValues(
inner types.InstantVectorOperator,
labelName types.StringOperator,
timeRange types.QueryTimeRange,
grouping []string,
without bool,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) *CountValues {
if without {
grouping = append(grouping, labels.MetricName)
}

slices.Sort(grouping)

return &CountValues{
Inner: inner,
LabelName: labelName,
TimeRange: timeRange,
Grouping: grouping,
Without: without,
MemoryConsumptionTracker: memoryConsumptionTracker,
expressionPosition: expressionPosition,
}
}

type countValuesSeries struct {
labels labels.Labels
outputPointCount int
count []int // One entry per timestamp.
}

var countValuesSeriesPool = sync.Pool{
New: func() interface{} {
return &countValuesSeries{}
},
}

func (c *CountValues) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
if err := c.loadLabelName(); err != nil {
return nil, err
}

innerMetadata, err := c.Inner.SeriesMetadata(ctx)
if err != nil {
return nil, err
}

defer types.PutSeriesMetadataSlice(innerMetadata)

c.labelsBuilder = labels.NewBuilder(labels.EmptyLabels())
c.labelsBytesBuffer = make([]byte, 0, 1024) // Why 1024 bytes? It's what labels.Labels.String() uses as a buffer size, so we use that as a sensible starting point too.
defer func() {
// Don't hold onto the instances used to generate series labels for longer than necessary.
c.labelsBuilder = nil
c.labelsBytesBuffer = nil
c.valueBuffer = nil
}()

accumulator := map[string]*countValuesSeries{}

for _, s := range innerMetadata {
data, err := c.Inner.NextSeries(ctx)
if err != nil {
return nil, err
}

for _, p := range data.Floats {
if err := c.incrementCount(s.Labels, p.T, c.formatFloatValue(p.F), accumulator); err != nil {
return nil, err
}
}

for _, p := range data.Histograms {
if err := c.incrementCount(s.Labels, p.T, p.H.String(), accumulator); err != nil {
return nil, err
}
}

types.PutInstantVectorSeriesData(data, c.MemoryConsumptionTracker)
}

outputMetadata := types.GetSeriesMetadataSlice(len(accumulator))
c.series = make([][]promql.FPoint, 0, len(accumulator))

for _, s := range accumulator {
outputMetadata = append(outputMetadata, types.SeriesMetadata{Labels: s.labels})

points, err := s.toPoints(c.MemoryConsumptionTracker, c.TimeRange)
if err != nil {
return nil, err
}

c.series = append(c.series, points)

types.IntSlicePool.Put(s.count, c.MemoryConsumptionTracker)
s.count = nil
countValuesSeriesPool.Put(s)
}

return outputMetadata, nil
}

func (c *CountValues) loadLabelName() error {
c.resolvedLabelName = c.LabelName.GetValue()
if !model.LabelName(c.resolvedLabelName).IsValid() {
return fmt.Errorf("invalid label name %q for count_values", c.resolvedLabelName)
}

return nil
}

func (c *CountValues) formatFloatValue(f float64) string {
// Using AppendFloat like this (rather than FormatFloat) allows us to reuse the buffer used for formatting the string,
// rather than creating a new one for every value.
c.valueBuffer = c.valueBuffer[:0]
c.valueBuffer = strconv.AppendFloat(c.valueBuffer, f, 'f', -1, 64)

return string(c.valueBuffer)
}

func (c *CountValues) incrementCount(seriesLabels labels.Labels, t int64, value string, accumulator map[string]*countValuesSeries) error {
l := c.computeOutputLabels(seriesLabels, value)
c.labelsBytesBuffer = l.Bytes(c.labelsBytesBuffer)
series, exists := accumulator[string(c.labelsBytesBuffer)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it.

if !exists {
series = countValuesSeriesPool.Get().(*countValuesSeries)
series.labels = l
series.outputPointCount = 0

var err error
series.count, err = types.IntSlicePool.Get(c.TimeRange.StepCount, c.MemoryConsumptionTracker)
if err != nil {
return err
}

series.count = series.count[:c.TimeRange.StepCount]
accumulator[string(c.labelsBytesBuffer)] = series
}

idx := c.TimeRange.PointIndex(t)

if series.count[idx] == 0 {
series.outputPointCount++
}

series.count[idx]++

return nil
}

func (c *CountValues) computeOutputLabels(seriesLabels labels.Labels, value string) labels.Labels {
c.labelsBuilder.Reset(seriesLabels)

if c.Without {
c.labelsBuilder.Del(c.Grouping...)
} else {
c.labelsBuilder.Keep(c.Grouping...)
}

c.labelsBuilder.Set(c.resolvedLabelName, value)

return c.labelsBuilder.Labels()
}

func (s *countValuesSeries) toPoints(memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange) ([]promql.FPoint, error) {
p, err := types.FPointSlicePool.Get(s.outputPointCount, memoryConsumptionTracker)
if err != nil {
return nil, err
}

for idx, count := range s.count {
if count == 0 {
continue
}

t := timeRange.IndexTime(int64(idx))
p = append(p, promql.FPoint{T: t, F: float64(count)})
}

return p, nil
}

func (c *CountValues) NextSeries(_ context.Context) (types.InstantVectorSeriesData, error) {
if len(c.series) == 0 {
return types.InstantVectorSeriesData{}, types.EOS
}

p := c.series[0]
c.series = c.series[1:]

return types.InstantVectorSeriesData{Floats: p}, nil
}

func (c *CountValues) ExpressionPosition() posrange.PositionRange {
return c.expressionPosition
}

func (c *CountValues) Close() {
c.Inner.Close()
c.LabelName.Close()
}
Loading

0 comments on commit 0394438

Please sign in to comment.