Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP: Use optimized OTel metric translator, converting directly to Mimir format #7957

Merged
merged 12 commits into from
May 30, 2024
Merged
3 changes: 2 additions & 1 deletion .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ jobs:
# Determine if we will deploy (aka push) the image to the registry.
is_deploy: ${{ (startsWith(github.ref, 'refs/tags/') || startsWith(github.ref, 'refs/heads/r')) && github.event_name == 'push' && github.repository == 'grafana/mimir' }}


goversion:
runs-on: ubuntu-latest
needs: prepare
Expand Down Expand Up @@ -103,6 +102,8 @@ jobs:
run: make BUILD_IN_CONTAINER=false check-license
- name: Check Docker-Compose YAML
run: make BUILD_IN_CONTAINER=false check-mimir-microservices-mode-docker-compose-yaml check-mimir-read-write-mode-docker-compose-yaml
- name: Check Generated OTLP Code
run: make BUILD_IN_CONTAINER=false check-generated-otlp-code

doc-validator:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. #7957
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ images: ## Print all image names.
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print)
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# Generating OTLP translation code is automated.
OTLP_GOS := $(shell find ./pkg/distributor/otlp/ -type f -name '*_generated.go' -print)

# Building binaries is now automated. The convention is to build a binary
# for every directory with main.go in it.
MAIN_GO := $(shell find . $(DONT_FIND) -type f -name 'main.go' -print)
Expand Down Expand Up @@ -771,3 +774,11 @@ test-packages: packages packaging/rpm/centos-systemd/$(UPTODATE) packaging/deb/d

docs: doc
cd docs && $(MAKE) docs

.PHONY: generate-otlp
generate-otlp:
cd pkg/distributor/otlp && rm *_generated.go && go generate

.PHONY: check-generated-otlp-code
check-generated-otlp-code: generate-otlp
@./tools/find-diff-or-untracked.sh $(OTLP_GOS) || (echo "Please rebuild OTLP code by running 'generate-otlp'" && false)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ require (
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down Expand Up @@ -237,7 +237,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/semconv v0.98.0 // indirect
go.opentelemetry.io/collector/semconv v0.98.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.uber.org/zap v1.21.0 // indirect
Expand Down
100 changes: 5 additions & 95 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/multierr"

"github.com/grafana/mimir/pkg/distributor/otlp"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/spanlogger"
Expand Down Expand Up @@ -259,11 +259,11 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
}

func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := prometheusremotewrite.NewPrometheusConverter()
errs := converter.FromMetrics(md, prometheusremotewrite.Settings{
converter := otlp.NewMimirConverter()
errs := converter.FromMetrics(md, otlp.Settings{
AddMetricSuffixes: addSuffixes,
})
promTS := converter.TimeSeries()
mimirTS := converter.TimeSeries()
if errs != nil {
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed
Expand All @@ -273,106 +273,16 @@ func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOt
parseErrs = parseErrs[:maxErrMsgLen]
}

if len(promTS) == 0 {
if len(mimirTS) == 0 {
return nil, errors.New(parseErrs)
}

level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs)
}

mimirTS := mimirpb.PreallocTimeseriesSliceFromPool()
for _, ts := range promTS {
mimirTS = append(mimirTS, promToMimirTimeseries(&ts))
}

return mimirTS, nil
}

func promToMimirTimeseries(promTs *prompb.TimeSeries) mimirpb.PreallocTimeseries {
labels := make([]mimirpb.LabelAdapter, 0, len(promTs.Labels))
for _, label := range promTs.Labels {
labels = append(labels, mimirpb.LabelAdapter{
Name: label.Name,
Value: label.Value,
})
}

samples := make([]mimirpb.Sample, 0, len(promTs.Samples))
for _, sample := range promTs.Samples {
samples = append(samples, mimirpb.Sample{
TimestampMs: sample.Timestamp,
Value: sample.Value,
})
}

histograms := make([]mimirpb.Histogram, 0, len(promTs.Histograms))
for idx := range promTs.Histograms {
histograms = append(histograms, promToMimirHistogram(&promTs.Histograms[idx]))
}

exemplars := make([]mimirpb.Exemplar, 0, len(promTs.Exemplars))
for _, exemplar := range promTs.Exemplars {
labels := make([]mimirpb.LabelAdapter, 0, len(exemplar.Labels))
for _, label := range exemplar.Labels {
labels = append(labels, mimirpb.LabelAdapter{
Name: label.Name,
Value: label.Value,
})
}

exemplars = append(exemplars, mimirpb.Exemplar{
Labels: labels,
Value: exemplar.Value,
TimestampMs: exemplar.Timestamp,
})
}

ts := mimirpb.TimeseriesFromPool()
ts.Labels = labels
ts.Samples = samples
ts.Histograms = histograms
ts.Exemplars = exemplars

return mimirpb.PreallocTimeseries{TimeSeries: ts}
}

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
pSpans = append(
pSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}
nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans))
for _, span := range h.NegativeSpans {
nSpans = append(
nSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}

return mimirpb.Histogram{
Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()},
NegativeSpans: nSpans,
NegativeDeltas: h.NegativeDeltas,
NegativeCounts: h.NegativeCounts,
PositiveSpans: pSpans,
PositiveDeltas: h.PositiveDeltas,
PositiveCounts: h.PositiveCounts,
Timestamp: h.Timestamp,
ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint),
}
}

// TimeseriesToOTLPRequest is used in tests.
func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/otlp/cmd/generate/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# OTLP translation code generator

This command line utility ("generate") generates code for translating from an OTLP write request to a Mimir remote write request.
It does so by transforming corresponding vendored Prometheus Go files through the [gopatch](https://github.com/uber-go/gopatch) tool.
119 changes: 119 additions & 0 deletions pkg/distributor/otlp/cmd/generate/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// SPDX-License-Identifier: AGPL-3.0-only

package main

import (
"bufio"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"

"github.com/alecthomas/kingpin/v2"

// Dummy dependency to ensure these files get vendored and we can use them as transformation sources.
_ "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
)

func main() {
app := kingpin.New("generate", "Generate OTLP translation code.")

generate := func(*kingpin.ParseContext) error {
sources, err := copySources()
if err != nil {
return err
}
if err := patchSources(sources); err != nil {
return err
}

return nil
}

app = app.Action(generate)
kingpin.MustParse(app.Parse(os.Args[1:]))
}

func patchSources(sources []string) error {
if out, err := exec.Command("gopatch", "-p", "cmd/generate/mimirpb.patch", "./").CombinedOutput(); err != nil {
return fmt.Errorf("failed to execute gopatch: %s", out)
}
sed := "sed"
if runtime.GOOS == "darwin" {
sed = "gsed"
}
for _, fname := range sources {
if out, err := exec.Command(sed, "-i", "s/PrometheusConverter/MimirConverter/g", fname).CombinedOutput(); err != nil {
return fmt.Errorf("failed to execute sed: %s", out)
}
if out, err := exec.Command(sed, "-i", "s/Prometheus remote write format/Mimir remote write format/g", fname).CombinedOutput(); err != nil {
return fmt.Errorf("failed to execute sed: %s", out)
}
if out, err := exec.Command("goimports", "-w", "-local", "github.com/grafana/mimir", fname).CombinedOutput(); err != nil {
return fmt.Errorf("failed to execute goimports: %s", out)
}
}

return nil
}

func copySources() ([]string, error) {
dpath := filepath.Join("..", "..", "..", "vendor", "github.com", "prometheus", "prometheus", "storage", "remote", "otlptranslator", "prometheusremotewrite")
entries, err := os.ReadDir(dpath)
if err != nil {
return nil, fmt.Errorf("list files in %q: %w", dpath, err)
}

var sources []string
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".go") || e.Name() == "timeseries.go" || strings.HasSuffix(e.Name(), "_test.go") {
continue
}

dst := fmt.Sprintf("%s_generated.go", e.Name()[:len(e.Name())-3])
if err := copyFile(filepath.Join(dpath, e.Name()), dst); err != nil {
return nil, err
}

sources = append(sources, dst)
}
return sources, nil
}

func copyFile(srcPath, dstPath string) (err error) {
src, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("open %q: %w", srcPath, err)
}
defer func() {
if cErr := src.Close(); cErr != nil {
err = errors.Join(fmt.Errorf("close %q: %w", srcPath, cErr), err)
}
}()

dst, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("create %q: %w", dstPath, err)
}
defer func() {
if cErr := dst.Close(); cErr != nil {
err = errors.Join(fmt.Errorf("close %q: %w", dstPath, cErr), err)
}
}()

wr := bufio.NewWriter(dst)
if _, err := wr.WriteString("// Code generated from Prometheus sources - DO NOT EDIT.\n\n"); err != nil {
return fmt.Errorf("write to %q: %w", dstPath, err)
}
if _, err := wr.ReadFrom(src); err != nil {
return fmt.Errorf("read from file %q: %w", srcPath, err)
}
if err := wr.Flush(); err != nil {
return fmt.Errorf("write to file %q: %w", dstPath, err)
}

return nil
}
Loading
Loading