Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP: Use optimized OTel metric translator, converting directly to Mimir format #7957

Merged
merged 12 commits into from
May 30, 2024
Merged
3 changes: 2 additions & 1 deletion .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ jobs:
# Determine if we will deploy (aka push) the image to the registry.
is_deploy: ${{ (startsWith(github.ref, 'refs/tags/') || startsWith(github.ref, 'refs/heads/r')) && github.event_name == 'push' && github.repository == 'grafana/mimir' }}


goversion:
runs-on: ubuntu-latest
needs: prepare
Expand Down Expand Up @@ -103,6 +102,8 @@ jobs:
run: make BUILD_IN_CONTAINER=false check-license
- name: Check Docker-Compose YAML
run: make BUILD_IN_CONTAINER=false check-mimir-microservices-mode-docker-compose-yaml check-mimir-read-write-mode-docker-compose-yaml
- name: Check Generated OTLP Code
run: make BUILD_IN_CONTAINER=false check-generated-otlp-code

doc-validator:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ images: ## Print all image names.
PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print)
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# Generating OTLP translation code is automated.
OTLP_GOS := $(shell find ./pkg/distributor/otlp/ -type f -name '*_generated.go' -print)

# Building binaries is now automated. The convention is to build a binary
# for every directory with main.go in it.
MAIN_GO := $(shell find . $(DONT_FIND) -type f -name 'main.go' -print)
Expand Down Expand Up @@ -260,7 +263,7 @@ mimir-build-image/$(UPTODATE): mimir-build-image/*
# All the boiler plate for building golang follows:
SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E")
BUILD_IN_CONTAINER ?= true
LATEST_BUILD_IMAGE_TAG ?= pr8107-6dc27b0894
LATEST_BUILD_IMAGE_TAG ?= pr7957-d7652788a7

# TTY is parameterized to allow Google Cloud Builder to run builds,
# as it currently disallows TTY devices. This value needs to be overridden
Expand Down Expand Up @@ -771,3 +774,11 @@ test-packages: packages packaging/rpm/centos-systemd/$(UPTODATE) packaging/deb/d

docs: doc
cd docs && $(MAKE) docs

.PHONY: generate-otlp
generate-otlp:
cd pkg/distributor/otlp && rm -f *_generated.go && go generate

.PHONY: check-generated-otlp-code
check-generated-otlp-code: generate-otlp
@./tools/find-diff-or-untracked.sh $(OTLP_GOS) || (echo "Please rebuild OTLP code by running 'generate-otlp'" && false)
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,17 @@
"fieldFlag": "distributor.reusable-ingester-push-workers",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "direct_otlp_translation_enabled",
"required": false,
"desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "distributor.direct-otlp-translation-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ Usage of ./cmd/mimir/mimir:
Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable.
-distributor.client-cleanup-period duration
How frequently to clean up clients for ingesters that have gone away. (default 15s)
-distributor.direct-otlp-translation-enabled
[experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true)
-distributor.drop-label string
This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.
-distributor.enable-otlp-metadata-storage
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ The following features are currently experimental:
- `-distributor.max-exemplars-per-series-per-request`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Enable direct translation from OTLP write requests to Mimir equivalents
- `-distributor.direct-otlp-translation-enabled`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,11 @@ instance_limits:
# limiting feature.)
# CLI flag: -distributor.reusable-ingester-push-workers
[reusable_ingester_push_workers: <int> | default = 2000]

# (experimental) When enabled, OTLP write requests are directly translated to
# Mimir equivalents, for optimum performance.
# CLI flag: -distributor.direct-otlp-translation-enabled
[direct_otlp_translation_enabled: <boolean> | default = true]
```

### ingester
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ require (
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down Expand Up @@ -237,7 +237,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/semconv v0.98.0 // indirect
go.opentelemetry.io/collector/semconv v0.98.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.uber.org/zap v1.21.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions mimir-build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ RUN GO111MODULE=on \
go install github.com/google/go-jsonnet/cmd/[email protected] && \
go install github.com/norwoodj/helm-docs/cmd/[email protected] && \
go install github.com/open-policy-agent/[email protected] && \
go install github.com/uber-go/[email protected] && \
rm -rf /go/pkg /go/src /root/.cache

COPY --from=helm /usr/bin/helm /usr/bin/helm
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ type Config struct {
WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems.
DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -237,6 +240,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")
f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down
42 changes: 39 additions & 3 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/multierr"

"github.com/grafana/mimir/pkg/distributor/otlp"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/spanlogger"
Expand Down Expand Up @@ -53,6 +54,7 @@ func OTLPHandler(
pushMetrics *PushMetrics,
reg prometheus.Registerer,
logger log.Logger,
directTranslation bool,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

Expand Down Expand Up @@ -154,9 +156,17 @@ func OTLPHandler(
pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
var metrics []mimirpb.PreallocTimeseries
if directTranslation {
metrics, err = otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
} else {
metrics, err = otelMetricsToTimeseriesOld(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
}

metricCount := len(metrics)
Expand Down Expand Up @@ -259,6 +269,32 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
}

func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := otlp.NewMimirConverter()
errs := converter.FromMetrics(md, otlp.Settings{
AddMetricSuffixes: addSuffixes,
})
mimirTS := converter.TimeSeries()
if errs != nil {
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed

parseErrs := errs.Error()
if len(parseErrs) > maxErrMsgLen {
parseErrs = parseErrs[:maxErrMsgLen]
}

if len(mimirTS) == 0 {
return nil, errors.New(parseErrs)
}

level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs)
}

return mimirTS, nil
}

// Old, less efficient, version of otelMetricsToTimeseries.
func otelMetricsToTimeseriesOld(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := prometheusremotewrite.NewPrometheusConverter()
errs := converter.FromMetrics(md, prometheusremotewrite.Settings{
AddMetricSuffixes: addSuffixes,
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger(), true)

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, false)
Expand Down
26 changes: 26 additions & 0 deletions pkg/distributor/otlp/generate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
# SPDX-License-Identifier: AGPL-3.0-only

set -euo pipefail

# Use GNU sed on MacOS falling back to `sed` everywhere else
SED=$(which gsed || which sed)

FILES=$(find ../../../vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite -name '*.go' ! -name timeseries.go ! -name "*_test.go")

for SRC in $FILES
do
BASENAME=$(basename "$SRC")
DST="${BASENAME%%.go}_generated.go"

rm -f "$DST"
echo "Processing $SRC to $DST"
printf "// Code generated from Prometheus sources - DO NOT EDIT.\n\n" >"$DST"
cat "$SRC" >> "$DST"

gopatch -p mimirpb.patch "$DST"

$SED -i "s/PrometheusConverter/MimirConverter/g" "$DST"
$SED -i "s/Prometheus remote write format/Mimir remote write format/g" "$DST"
goimports -w -local github.com/grafana/mimir "$DST"
done
Loading
Loading