Skip to content

Commit

Permalink
OTLP: Use optimized OTel metric translator, converting directly to Mi…
Browse files Browse the repository at this point in the history
…mir format (#7957)

* Use optimized OTLP translator, converting directly to Mimir format

---------

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 authored May 30, 2024
1 parent cac654c commit 8e5f25e
Show file tree
Hide file tree
Showing 23 changed files with 1,423 additions and 16 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ jobs:
# Determine if we will deploy (aka push) the image to the registry.
is_deploy: ${{ (startsWith(github.ref, 'refs/tags/') || startsWith(github.ref, 'refs/heads/r')) && github.event_name == 'push' && github.repository == 'grafana/mimir' }}


goversion:
runs-on: ubuntu-latest
needs: prepare
Expand Down Expand Up @@ -103,6 +102,8 @@ jobs:
run: make BUILD_IN_CONTAINER=false check-license
- name: Check Docker-Compose YAML
run: make BUILD_IN_CONTAINER=false check-mimir-microservices-mode-docker-compose-yaml check-mimir-read-write-mode-docker-compose-yaml
- name: Check Generated OTLP Code
run: make BUILD_IN_CONTAINER=false check-generated-otlp-code

doc-validator:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ images: ## Print all image names.
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print)
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# Generating OTLP translation code is automated.
OTLP_GOS := $(shell find ./pkg/distributor/otlp/ -type f -name '*_generated.go' -print)

# Building binaries is now automated. The convention is to build a binary
# for every directory with main.go in it.
MAIN_GO := $(shell find . $(DONT_FIND) -type f -name 'main.go' -print)
Expand Down Expand Up @@ -260,7 +263,7 @@ mimir-build-image/$(UPTODATE): mimir-build-image/*
# All the boiler plate for building golang follows:
SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E")
BUILD_IN_CONTAINER ?= true
LATEST_BUILD_IMAGE_TAG ?= pr8107-6dc27b0894
LATEST_BUILD_IMAGE_TAG ?= pr7957-d7652788a7

# TTY is parameterized to allow Google Cloud Builder to run builds,
# as it currently disallows TTY devices. This value needs to be overridden
Expand Down Expand Up @@ -771,3 +774,11 @@ test-packages: packages packaging/rpm/centos-systemd/$(UPTODATE) packaging/deb/d

docs: doc
cd docs && $(MAKE) docs

.PHONY: generate-otlp
generate-otlp:
cd pkg/distributor/otlp && rm -f *_generated.go && go generate

.PHONY: check-generated-otlp-code
check-generated-otlp-code: generate-otlp
@./tools/find-diff-or-untracked.sh $(OTLP_GOS) || (echo "Please rebuild OTLP code by running 'generate-otlp'" && false)
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,17 @@
"fieldFlag": "distributor.reusable-ingester-push-workers",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "direct_otlp_translation_enabled",
"required": false,
"desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "distributor.direct-otlp-translation-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ Usage of ./cmd/mimir/mimir:
Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable.
-distributor.client-cleanup-period duration
How frequently to clean up clients for ingesters that have gone away. (default 15s)
-distributor.direct-otlp-translation-enabled
[experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true)
-distributor.drop-label string
This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.
-distributor.enable-otlp-metadata-storage
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ The following features are currently experimental:
- `-distributor.max-exemplars-per-series-per-request`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Enable direct translation from OTLP write requests to Mimir equivalents
- `-distributor.direct-otlp-translation-enabled`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,11 @@ instance_limits:
# limiting feature.)
# CLI flag: -distributor.reusable-ingester-push-workers
[reusable_ingester_push_workers: <int> | default = 2000]
# (experimental) When enabled, OTLP write requests are directly translated to
# Mimir equivalents, for optimum performance.
# CLI flag: -distributor.direct-otlp-translation-enabled
[direct_otlp_translation_enabled: <boolean> | default = true]
```

### ingester
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ require (
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down Expand Up @@ -237,7 +237,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/semconv v0.98.0 // indirect
go.opentelemetry.io/collector/semconv v0.98.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.uber.org/zap v1.21.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions mimir-build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ RUN GO111MODULE=on \
go install github.com/google/go-jsonnet/cmd/[email protected] && \
go install github.com/norwoodj/helm-docs/cmd/[email protected] && \
go install github.com/open-policy-agent/[email protected] && \
go install github.com/uber-go/[email protected] && \
rm -rf /go/pkg /go/src /root/.cache

COPY --from=helm /usr/bin/helm /usr/bin/helm
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ type Config struct {
WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems.
DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -237,6 +240,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")
f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down
42 changes: 39 additions & 3 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/multierr"

"github.com/grafana/mimir/pkg/distributor/otlp"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/spanlogger"
Expand Down Expand Up @@ -53,6 +54,7 @@ func OTLPHandler(
pushMetrics *PushMetrics,
reg prometheus.Registerer,
logger log.Logger,
directTranslation bool,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

Expand Down Expand Up @@ -154,9 +156,17 @@ func OTLPHandler(
pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
var metrics []mimirpb.PreallocTimeseries
if directTranslation {
metrics, err = otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
} else {
metrics, err = otelMetricsToTimeseriesOld(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
}

metricCount := len(metrics)
Expand Down Expand Up @@ -259,6 +269,32 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
}

func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := otlp.NewMimirConverter()
errs := converter.FromMetrics(md, otlp.Settings{
AddMetricSuffixes: addSuffixes,
})
mimirTS := converter.TimeSeries()
if errs != nil {
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed

parseErrs := errs.Error()
if len(parseErrs) > maxErrMsgLen {
parseErrs = parseErrs[:maxErrMsgLen]
}

if len(mimirTS) == 0 {
return nil, errors.New(parseErrs)
}

level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs)
}

return mimirTS, nil
}

// Old, less efficient, version of otelMetricsToTimeseries.
func otelMetricsToTimeseriesOld(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := prometheusremotewrite.NewPrometheusConverter()
errs := converter.FromMetrics(md, prometheusremotewrite.Settings{
AddMetricSuffixes: addSuffixes,
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger(), true)

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, false)
Expand Down
26 changes: 26 additions & 0 deletions pkg/distributor/otlp/generate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
# SPDX-License-Identifier: AGPL-3.0-only

set -euo pipefail

# Use GNU sed on MacOS falling back to `sed` everywhere else
SED=$(which gsed || which sed)

FILES=$(find ../../../vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite -name '*.go' ! -name timeseries.go ! -name "*_test.go")

for SRC in $FILES
do
BASENAME=$(basename "$SRC")
DST="${BASENAME%%.go}_generated.go"

rm -f "$DST"
echo "Processing $SRC to $DST"
printf "// Code generated from Prometheus sources - DO NOT EDIT.\n\n" >"$DST"
cat "$SRC" >> "$DST"

gopatch -p mimirpb.patch "$DST"

$SED -i "s/PrometheusConverter/MimirConverter/g" "$DST"
$SED -i "s/Prometheus remote write format/Mimir remote write format/g" "$DST"
goimports -w -local github.com/grafana/mimir "$DST"
done
Loading

0 comments on commit 8e5f25e

Please sign in to comment.