diff --git a/CHANGELOG.md b/CHANGELOG.md index a25a807ca70..d507e0d5077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,7 +42,7 @@ * [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039 * [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084 * [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191 -* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. #7957 +* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index fa2d5ee4b0e..c77a0066876 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1701,6 +1701,17 @@ "fieldFlag": "distributor.reusable-ingester-push-workers", "fieldType": "int", "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "direct_otlp_translation_enabled", + "required": false, + "desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "distributor.direct-otlp-translation-enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index deddd2ed984..1cbd27d54c6 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1095,6 +1095,8 @@ Usage of ./cmd/mimir/mimir: Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable. -distributor.client-cleanup-period duration How frequently to clean up clients for ingesters that have gone away. (default 15s) + -distributor.direct-otlp-translation-enabled + [experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true) -distributor.drop-label string This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels. -distributor.enable-otlp-metadata-storage diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 409e1e14ac8..9255af6dc48 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -73,6 +73,8 @@ The following features are currently experimental: - `-distributor.max-exemplars-per-series-per-request` - Enforce a maximum pool buffer size for write requests - `-distributor.max-request-pool-buffer-size` + - Enable direct translation from OTLP write requests to Mimir equivalents + - `-distributor.direct-otlp-translation-enabled` - Hash ring - Disabling ring heartbeat timeouts - `-distributor.ring.heartbeat-timeout=0` diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 03c2e14cb60..54141ddf7cb 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -955,6 +955,11 @@ instance_limits: # limiting feature.) # CLI flag: -distributor.reusable-ingester-push-workers [reusable_ingester_push_workers: | default = 2000] + +# (experimental) When enabled, OTLP write requests are directly translated to +# Mimir equivalents, for optimum performance. +# CLI flag: -distributor.direct-otlp-translation-enabled +[direct_otlp_translation_enabled: | default = true] ``` ### ingester diff --git a/pkg/api/api.go b/pkg/api/api.go index a70d579dac2..43a1a495cb8 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -262,7 +262,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib distributorpb.RegisterDistributorServer(a.server.GRPC, d) a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST") - a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST") + a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST") a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{ {Desc: "Ring status", Path: "/distributor/ring"}, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 025862ef222..2b94b59ee13 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -219,6 +219,9 @@ type Config struct { WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"` LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"` + + // DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems. + DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"` } // PushWrapper wraps around a push. It is similar to middleware.Interface. @@ -237,6 +240,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.") f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.") f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)") + f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.") cfg.DefaultLimits.RegisterFlags(f) } diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 49d0c1ed393..9fcf3738f38 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" @@ -53,6 +54,7 @@ func OTLPHandler( pushMetrics *PushMetrics, reg prometheus.Registerer, logger log.Logger, + directTranslation bool, ) http.Handler { discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError) @@ -154,9 +156,17 @@ func OTLPHandler( pushMetrics.IncOTLPRequest(tenantID) pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize)) - metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) - if err != nil { - return err + var metrics []mimirpb.PreallocTimeseries + if directTranslation { + metrics, err = otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) + if err != nil { + return err + } + } else { + metrics, err = otelMetricsToTimeseriesOld(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) + if err != nil { + return err + } } metricCount := len(metrics) @@ -283,6 +293,122 @@ func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOt return mimirTS, nil } +// Old, less efficient, version of otelMetricsToTimeseries. +func otelMetricsToTimeseriesOld(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { + converter := prometheusremotewrite.NewPrometheusConverter() + errs := converter.FromMetrics(md, prometheusremotewrite.Settings{ + AddMetricSuffixes: addSuffixes, + }) + promTS := converter.TimeSeries() + if errs != nil { + dropped := len(multierr.Errors(errs)) + discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed + + parseErrs := errs.Error() + if len(parseErrs) > maxErrMsgLen { + parseErrs = parseErrs[:maxErrMsgLen] + } + + if len(promTS) == 0 { + return nil, errors.New(parseErrs) + } + + level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs) + } + + mimirTS := mimirpb.PreallocTimeseriesSliceFromPool() + for _, ts := range promTS { + mimirTS = append(mimirTS, promToMimirTimeseries(&ts)) + } + + return mimirTS, nil +} + +func promToMimirTimeseries(promTs *prompb.TimeSeries) mimirpb.PreallocTimeseries { + labels := make([]mimirpb.LabelAdapter, 0, len(promTs.Labels)) + for _, label := range promTs.Labels { + labels = append(labels, mimirpb.LabelAdapter{ + Name: label.Name, + Value: label.Value, + }) + } + + samples := make([]mimirpb.Sample, 0, len(promTs.Samples)) + for _, sample := range promTs.Samples { + samples = append(samples, mimirpb.Sample{ + TimestampMs: sample.Timestamp, + Value: sample.Value, + }) + } + + histograms := make([]mimirpb.Histogram, 0, len(promTs.Histograms)) + for idx := range promTs.Histograms { + histograms = append(histograms, promToMimirHistogram(&promTs.Histograms[idx])) + } + + exemplars := make([]mimirpb.Exemplar, 0, len(promTs.Exemplars)) + for _, exemplar := range promTs.Exemplars { + labels := make([]mimirpb.LabelAdapter, 0, len(exemplar.Labels)) + for _, label := range exemplar.Labels { + labels = append(labels, mimirpb.LabelAdapter{ + Name: label.Name, + Value: label.Value, + }) + } + + exemplars = append(exemplars, mimirpb.Exemplar{ + Labels: labels, + Value: exemplar.Value, + TimestampMs: exemplar.Timestamp, + }) + } + + ts := mimirpb.TimeseriesFromPool() + ts.Labels = labels + ts.Samples = samples + ts.Histograms = histograms + ts.Exemplars = exemplars + + return mimirpb.PreallocTimeseries{TimeSeries: ts} +} + +func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram { + pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans)) + for _, span := range h.PositiveSpans { + pSpans = append( + pSpans, mimirpb.BucketSpan{ + Offset: span.Offset, + Length: span.Length, + }, + ) + } + nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans)) + for _, span := range h.NegativeSpans { + nSpans = append( + nSpans, mimirpb.BucketSpan{ + Offset: span.Offset, + Length: span.Length, + }, + ) + } + + return mimirpb.Histogram{ + Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()}, + Sum: h.Sum, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}, + NegativeSpans: nSpans, + NegativeDeltas: h.NegativeDeltas, + NegativeCounts: h.NegativeCounts, + PositiveSpans: pSpans, + PositiveDeltas: h.PositiveDeltas, + PositiveCounts: h.PositiveCounts, + Timestamp: h.Timestamp, + ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint), + } +} + // TimeseriesToOTLPRequest is used in tests. func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) pmetricotlp.ExportRequest { d := pmetric.NewMetrics() diff --git a/pkg/distributor/otel_test.go b/pkg/distributor/otel_test.go index 11aeed02550..d737198c664 100644 --- a/pkg/distributor/otel_test.go +++ b/pkg/distributor/otel_test.go @@ -66,7 +66,7 @@ func BenchmarkOTLPHandler(b *testing.B) { validation.NewMockTenantLimits(map[string]*validation.Limits{}), ) require.NoError(b, err) - handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger()) + handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger(), true) b.Run("protobuf", func(b *testing.B) { req := createOTLPProtoRequest(b, exportReq, false) diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 523d26a1ee5..0c9a864f34f 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -300,7 +300,7 @@ func TestHandlerOTLPPush(t *testing.T) { t.Cleanup(pushReq.CleanUp) return tt.verifyFunc(t, pushReq) } - handler := OTLPHandler(tt.maxMsgSize, nil, nil, false, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger()) + handler := OTLPHandler(tt.maxMsgSize, nil, nil, false, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger(), true) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) @@ -368,7 +368,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) { assert.False(t, request.SkipLabelNameValidation) pushReq.CleanUp() return nil - }, nil, nil, log.NewNopLogger()) + }, nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -414,7 +414,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { assert.Len(t, request.Timeseries, 1) assert.False(t, request.SkipLabelNameValidation) return nil - }, nil, nil, log.NewNopLogger()) + }, nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) @@ -440,7 +440,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { assert.Len(t, request.Timeseries, 9) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case assert.False(t, request.SkipLabelNameValidation) return nil - }, nil, nil, log.NewNopLogger()) + }, nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -461,7 +461,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) { resp := httptest.NewRecorder() - handler := OTLPHandler(140, nil, nil, false, true, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger()) + handler := OTLPHandler(140, nil, nil, false, true, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code) body, err := io.ReadAll(resp.Body) @@ -662,43 +662,6 @@ func createPrometheusRemoteWriteProtobuf(t testing.TB) []byte { return inputBytes } -func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram { - pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans)) - for _, span := range h.PositiveSpans { - pSpans = append( - pSpans, mimirpb.BucketSpan{ - Offset: span.Offset, - Length: span.Length, - }, - ) - } - nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans)) - for _, span := range h.NegativeSpans { - nSpans = append( - nSpans, mimirpb.BucketSpan{ - Offset: span.Offset, - Length: span.Length, - }, - ) - } - - return mimirpb.Histogram{ - Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()}, - Sum: h.Sum, - Schema: h.Schema, - ZeroThreshold: h.ZeroThreshold, - ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()}, - NegativeSpans: nSpans, - NegativeDeltas: h.NegativeDeltas, - NegativeCounts: h.NegativeCounts, - PositiveSpans: pSpans, - PositiveDeltas: h.PositiveDeltas, - PositiveCounts: h.PositiveCounts, - Timestamp: h.Timestamp, - ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint), - } -} - func createMimirWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool) []byte { t.Helper() h := remote.HistogramToHistogramProto(1337, test.GenerateTestHistogram(1))