diff --git a/.github/workflows/test-build-deploy.yml b/.github/workflows/test-build-deploy.yml index 104beea3817..f051f803b7a 100644 --- a/.github/workflows/test-build-deploy.yml +++ b/.github/workflows/test-build-deploy.yml @@ -30,7 +30,6 @@ jobs: # Determine if we will deploy (aka push) the image to the registry. is_deploy: ${{ (startsWith(github.ref, 'refs/tags/') || startsWith(github.ref, 'refs/heads/r')) && github.event_name == 'push' && github.repository == 'grafana/mimir' }} - goversion: runs-on: ubuntu-latest needs: prepare @@ -103,6 +102,8 @@ jobs: run: make BUILD_IN_CONTAINER=false check-license - name: Check Docker-Compose YAML run: make BUILD_IN_CONTAINER=false check-mimir-microservices-mode-docker-compose-yaml check-mimir-read-write-mode-docker-compose-yaml + - name: Check Generated OTLP Code + run: make BUILD_IN_CONTAINER=false check-generated-otlp-code doc-validator: runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 4de16749af6..d507e0d5077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039 * [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084 * [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191 +* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 * [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624 diff --git a/Makefile b/Makefile index 8c5562b07b5..74aade46b55 100644 --- a/Makefile +++ b/Makefile @@ -231,6 +231,9 @@ images: ## Print all image names. PROTO_DEFS := $(shell find . $(DONT_FIND) -type f -name '*.proto' -print) PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) +# Generating OTLP translation code is automated. +OTLP_GOS := $(shell find ./pkg/distributor/otlp/ -type f -name '*_generated.go' -print) + # Building binaries is now automated. The convention is to build a binary # for every directory with main.go in it. MAIN_GO := $(shell find . $(DONT_FIND) -type f -name 'main.go' -print) @@ -260,7 +263,7 @@ mimir-build-image/$(UPTODATE): mimir-build-image/* # All the boiler plate for building golang follows: SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E") BUILD_IN_CONTAINER ?= true -LATEST_BUILD_IMAGE_TAG ?= pr8107-6dc27b0894 +LATEST_BUILD_IMAGE_TAG ?= pr7957-d7652788a7 # TTY is parameterized to allow Google Cloud Builder to run builds, # as it currently disallows TTY devices. This value needs to be overridden @@ -771,3 +774,11 @@ test-packages: packages packaging/rpm/centos-systemd/$(UPTODATE) packaging/deb/d docs: doc cd docs && $(MAKE) docs + +.PHONY: generate-otlp +generate-otlp: + cd pkg/distributor/otlp && rm -f *_generated.go && go generate + +.PHONY: check-generated-otlp-code +check-generated-otlp-code: generate-otlp + @./tools/find-diff-or-untracked.sh $(OTLP_GOS) || (echo "Please rebuild OTLP code by running 'generate-otlp'" && false) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index fa2d5ee4b0e..c77a0066876 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1701,6 +1701,17 @@ "fieldFlag": "distributor.reusable-ingester-push-workers", "fieldType": "int", "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "direct_otlp_translation_enabled", + "required": false, + "desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "distributor.direct-otlp-translation-enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index deddd2ed984..1cbd27d54c6 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1095,6 +1095,8 @@ Usage of ./cmd/mimir/mimir: Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable. -distributor.client-cleanup-period duration How frequently to clean up clients for ingesters that have gone away. (default 15s) + -distributor.direct-otlp-translation-enabled + [experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true) -distributor.drop-label string This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels. -distributor.enable-otlp-metadata-storage diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 409e1e14ac8..9255af6dc48 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -73,6 +73,8 @@ The following features are currently experimental: - `-distributor.max-exemplars-per-series-per-request` - Enforce a maximum pool buffer size for write requests - `-distributor.max-request-pool-buffer-size` + - Enable direct translation from OTLP write requests to Mimir equivalents + - `-distributor.direct-otlp-translation-enabled` - Hash ring - Disabling ring heartbeat timeouts - `-distributor.ring.heartbeat-timeout=0` diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 03c2e14cb60..54141ddf7cb 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -955,6 +955,11 @@ instance_limits: # limiting feature.) # CLI flag: -distributor.reusable-ingester-push-workers [reusable_ingester_push_workers: | default = 2000] + +# (experimental) When enabled, OTLP write requests are directly translated to +# Mimir equivalents, for optimum performance. +# CLI flag: -distributor.direct-otlp-translation-enabled +[direct_otlp_translation_enabled: | default = true] ``` ### ingester diff --git a/go.mod b/go.mod index c6c5b776310..ea3262b9cd2 100644 --- a/go.mod +++ b/go.mod @@ -140,7 +140,7 @@ require ( github.com/bits-and-blooms/bitset v1.13.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -237,7 +237,7 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector/semconv v0.98.0 // indirect + go.opentelemetry.io/collector/semconv v0.98.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect go.opentelemetry.io/otel/metric v1.26.0 // indirect go.uber.org/zap v1.21.0 // indirect diff --git a/mimir-build-image/Dockerfile b/mimir-build-image/Dockerfile index 66b703f2f10..288eaa580e1 100644 --- a/mimir-build-image/Dockerfile +++ b/mimir-build-image/Dockerfile @@ -56,6 +56,7 @@ RUN GO111MODULE=on \ go install github.com/google/go-jsonnet/cmd/jsonnetfmt@v0.19.1 && \ go install github.com/norwoodj/helm-docs/cmd/helm-docs@v1.8.1 && \ go install github.com/open-policy-agent/conftest@v0.42.1 && \ + go install github.com/uber-go/gopatch@v0.4.0 && \ rm -rf /go/pkg /go/src /root/.cache COPY --from=helm /usr/bin/helm /usr/bin/helm diff --git a/pkg/api/api.go b/pkg/api/api.go index a70d579dac2..43a1a495cb8 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -262,7 +262,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib distributorpb.RegisterDistributorServer(a.server.GRPC, d) a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST") - a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST") + a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST") a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{ {Desc: "Ring status", Path: "/distributor/ring"}, diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 025862ef222..2b94b59ee13 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -219,6 +219,9 @@ type Config struct { WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"` LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"` + + // DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems. + DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"` } // PushWrapper wraps around a push. It is similar to middleware.Interface. @@ -237,6 +240,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.") f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.") f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)") + f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.") cfg.DefaultLimits.RegisterFlags(f) } diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index b279693af80..9fcf3738f38 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.uber.org/multierr" + "github.com/grafana/mimir/pkg/distributor/otlp" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" @@ -53,6 +54,7 @@ func OTLPHandler( pushMetrics *PushMetrics, reg prometheus.Registerer, logger log.Logger, + directTranslation bool, ) http.Handler { discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError) @@ -154,9 +156,17 @@ func OTLPHandler( pushMetrics.IncOTLPRequest(tenantID) pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize)) - metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) - if err != nil { - return err + var metrics []mimirpb.PreallocTimeseries + if directTranslation { + metrics, err = otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) + if err != nil { + return err + } + } else { + metrics, err = otelMetricsToTimeseriesOld(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) + if err != nil { + return err + } } metricCount := len(metrics) @@ -259,6 +269,32 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr } func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { + converter := otlp.NewMimirConverter() + errs := converter.FromMetrics(md, otlp.Settings{ + AddMetricSuffixes: addSuffixes, + }) + mimirTS := converter.TimeSeries() + if errs != nil { + dropped := len(multierr.Errors(errs)) + discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed + + parseErrs := errs.Error() + if len(parseErrs) > maxErrMsgLen { + parseErrs = parseErrs[:maxErrMsgLen] + } + + if len(mimirTS) == 0 { + return nil, errors.New(parseErrs) + } + + level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs) + } + + return mimirTS, nil +} + +// Old, less efficient, version of otelMetricsToTimeseries. +func otelMetricsToTimeseriesOld(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { converter := prometheusremotewrite.NewPrometheusConverter() errs := converter.FromMetrics(md, prometheusremotewrite.Settings{ AddMetricSuffixes: addSuffixes, diff --git a/pkg/distributor/otel_test.go b/pkg/distributor/otel_test.go index 11aeed02550..d737198c664 100644 --- a/pkg/distributor/otel_test.go +++ b/pkg/distributor/otel_test.go @@ -66,7 +66,7 @@ func BenchmarkOTLPHandler(b *testing.B) { validation.NewMockTenantLimits(map[string]*validation.Limits{}), ) require.NoError(b, err) - handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger()) + handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger(), true) b.Run("protobuf", func(b *testing.B) { req := createOTLPProtoRequest(b, exportReq, false) diff --git a/pkg/distributor/otlp/generate.sh b/pkg/distributor/otlp/generate.sh new file mode 100755 index 00000000000..85a3d081713 --- /dev/null +++ b/pkg/distributor/otlp/generate.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# SPDX-License-Identifier: AGPL-3.0-only + +set -euo pipefail + +# Use GNU sed on MacOS falling back to `sed` everywhere else +SED=$(which gsed || which sed) + +FILES=$(find ../../../vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite -name '*.go' ! -name timeseries.go ! -name "*_test.go") + +for SRC in $FILES +do + BASENAME=$(basename "$SRC") + DST="${BASENAME%%.go}_generated.go" + + rm -f "$DST" + echo "Processing $SRC to $DST" + printf "// Code generated from Prometheus sources - DO NOT EDIT.\n\n" >"$DST" + cat "$SRC" >> "$DST" + + gopatch -p mimirpb.patch "$DST" + + $SED -i "s/PrometheusConverter/MimirConverter/g" "$DST" + $SED -i "s/Prometheus remote write format/Mimir remote write format/g" "$DST" + goimports -w -local github.com/grafana/mimir "$DST" +done diff --git a/pkg/distributor/otlp/helper_generated.go b/pkg/distributor/otlp/helper_generated.go new file mode 100644 index 00000000000..db896dc5ed0 --- /dev/null +++ b/pkg/distributor/otlp/helper_generated.go @@ -0,0 +1,576 @@ +// Code generated from Prometheus sources - DO NOT EDIT. + +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/helper.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package otlp + +import ( + "encoding/hex" + "fmt" + "log" + "math" + "slices" + "sort" + "strconv" + "time" + "unicode/utf8" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + + "github.com/grafana/mimir/pkg/mimirpb" + + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" +) + +const ( + sumStr = "_sum" + countStr = "_count" + bucketStr = "_bucket" + leStr = "le" + quantileStr = "quantile" + pInfStr = "+Inf" + createdSuffix = "_created" + // maxExemplarRunes is the maximum number of UTF-8 exemplar characters + // according to the prometheus specification + // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars + maxExemplarRunes = 128 + // Trace and Span id keys are defined as part of the spec: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification%2Fmetrics%2Fdatamodel.md#exemplars-2 + traceIDKey = "trace_id" + spanIDKey = "span_id" + infoType = "info" + targetMetricName = "target_info" +) + +type bucketBoundsData struct { + ts *mimirpb.TimeSeries + bound float64 +} + +// byBucketBoundsData enables the usage of sort.Sort() with a slice of bucket bounds +type byBucketBoundsData []bucketBoundsData + +func (m byBucketBoundsData) Len() int { return len(m) } +func (m byBucketBoundsData) Less(i, j int) bool { return m[i].bound < m[j].bound } +func (m byBucketBoundsData) Swap(i, j int) { m[i], m[j] = m[j], m[i] } + +// ByLabelName enables the usage of sort.Sort() with a slice of labels +type ByLabelName []mimirpb.LabelAdapter + +func (a ByLabelName) Len() int { return len(a) } +func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// timeSeriesSignature returns a hashed label set signature. +// The label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. +// The algorithm is the same as in Prometheus' labels.StableHash function. +func timeSeriesSignature(labels []mimirpb.LabelAdapter) uint64 { + sort.Sort(ByLabelName(labels)) + + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + for i, v := range labels { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB+ do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range labels[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(seps) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(seps) + } + return h.Sum64() + } + + b = append(b, v.Name...) + b = append(b, seps[0]) + b = append(b, v.Value...) + b = append(b, seps[0]) + } + return xxhash.Sum64(b) +} + +var seps = []byte{'\xff'} + +// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. +// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and +// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. +func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, + ignoreAttrs []string, logOnOverwrite bool, extras ...string) []mimirpb.LabelAdapter { + resourceAttrs := resource.Attributes() + serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) + instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) + + // Calculate the maximum possible number of labels we could return so we can preallocate l + maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 + + if haveServiceName { + maxLabelCount++ + } + + if haveInstanceID { + maxLabelCount++ + } + + // map ensures no duplicate label name + l := make(map[string]string, maxLabelCount) + + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + labels := make([]mimirpb.LabelAdapter, 0, maxLabelCount) + // XXX: Should we always drop service namespace/service name/service instance ID from the labels + // (as they get mapped to other Prometheus labels)? + attributes.Range(func(key string, value pcommon.Value) bool { + if !slices.Contains(ignoreAttrs, key) { + labels = append(labels, mimirpb.LabelAdapter{Name: key, Value: value.AsString()}) + } + return true + }) + sort.Stable(ByLabelName(labels)) + + for _, label := range labels { + var finalKey = prometheustranslator.NormalizeLabel(label.Name) + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + l[finalKey] = existingValue + ";" + label.Value + } else { + l[finalKey] = label.Value + } + } + + // Map service.name + service.namespace to job + if haveServiceName { + val := serviceName.AsString() + if serviceNamespace, ok := resourceAttrs.Get(conventions.AttributeServiceNamespace); ok { + val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) + } + l[model.JobLabel] = val + } + // Map service.instance.id to instance + if haveInstanceID { + l[model.InstanceLabel] = instance.AsString() + } + for key, value := range externalLabels { + // External labels have already been sanitized + if _, alreadyExists := l[key]; alreadyExists { + // Skip external labels if they are overridden by metric attributes + continue + } + l[key] = value + } + + for i := 0; i < len(extras); i += 2 { + if i+1 >= len(extras) { + break + } + _, found := l[extras[i]] + if found && logOnOverwrite { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + // internal labels should be maintained + name := extras[i] + if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") { + name = prometheustranslator.NormalizeLabel(name) + } + l[name] = extras[i+1] + } + + labels = labels[:0] + for k, v := range l { + labels = append(labels, mimirpb.LabelAdapter{Name: k, Value: v}) + } + + return labels +} + +// isValidAggregationTemporality checks whether an OTel metric has a valid +// aggregation temporality for conversion to a Prometheus metric. +func isValidAggregationTemporality(metric pmetric.Metric) bool { + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return true + case pmetric.MetricTypeSum: + return metric.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + case pmetric.MetricTypeHistogram: + return metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + case pmetric.MetricTypeExponentialHistogram: + return metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative + } + return false +} + +func (c *MimirConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, + resource pcommon.Resource, settings Settings, baseName string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + timestamp := convertTimeStamp(pt.Timestamp()) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + + // If the sum is unset, it indicates the _sum metric point should be + // omitted + if pt.HasSum() { + // treat sum as a sample in an individual TimeSeries + sum := &mimirpb.Sample{ + Value: pt.Sum(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + + sumlabels := createLabels(baseName+sumStr, baseLabels) + c.addSample(sum, sumlabels) + + } + + // treat count as a sample in an individual TimeSeries + count := &mimirpb.Sample{ + Value: float64(pt.Count()), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + + countlabels := createLabels(baseName+countStr, baseLabels) + c.addSample(count, countlabels) + + // cumulative count for conversion to cumulative histogram + var cumulativeCount uint64 + + var bucketBounds []bucketBoundsData + + // process each bound, based on histograms proto definition, # of buckets = # of explicit bounds + 1 + for i := 0; i < pt.ExplicitBounds().Len() && i < pt.BucketCounts().Len(); i++ { + bound := pt.ExplicitBounds().At(i) + cumulativeCount += pt.BucketCounts().At(i) + bucket := &mimirpb.Sample{ + Value: float64(cumulativeCount), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + bucket.Value = math.Float64frombits(value.StaleNaN) + } + boundStr := strconv.FormatFloat(bound, 'f', -1, 64) + labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr) + ts := c.addSample(bucket, labels) + + bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound}) + } + // add le=+Inf bucket + infBucket := &mimirpb.Sample{ + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + infBucket.Value = math.Float64frombits(value.StaleNaN) + } else { + infBucket.Value = float64(pt.Count()) + } + infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) + ts := c.addSample(infBucket, infLabels) + + bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) + c.addExemplars(pt, bucketBounds) + + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + labels := createLabels(baseName+createdSuffix, baseLabels) + c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp()) + } + } +} + +type exemplarType interface { + pmetric.ExponentialHistogramDataPoint | pmetric.HistogramDataPoint | pmetric.NumberDataPoint + Exemplars() pmetric.ExemplarSlice +} + +func getPromExemplars[T exemplarType](pt T) []mimirpb.Exemplar { + promExemplars := make([]mimirpb.Exemplar, 0, pt.Exemplars().Len()) + for i := 0; i < pt.Exemplars().Len(); i++ { + exemplar := pt.Exemplars().At(i) + exemplarRunes := 0 + + promExemplar := mimirpb.Exemplar{ + Value: exemplar.DoubleValue(), + TimestampMs: timestamp.FromTime(exemplar.Timestamp().AsTime()), + } + if traceID := exemplar.TraceID(); !traceID.IsEmpty() { + val := hex.EncodeToString(traceID[:]) + exemplarRunes += utf8.RuneCountInString(traceIDKey) + utf8.RuneCountInString(val) + promLabel := mimirpb.LabelAdapter{ + Name: traceIDKey, + Value: val, + } + promExemplar.Labels = append(promExemplar.Labels, promLabel) + } + if spanID := exemplar.SpanID(); !spanID.IsEmpty() { + val := hex.EncodeToString(spanID[:]) + exemplarRunes += utf8.RuneCountInString(spanIDKey) + utf8.RuneCountInString(val) + promLabel := mimirpb.LabelAdapter{ + Name: spanIDKey, + Value: val, + } + promExemplar.Labels = append(promExemplar.Labels, promLabel) + } + + attrs := exemplar.FilteredAttributes() + labelsFromAttributes := make([]mimirpb.LabelAdapter, 0, attrs.Len()) + attrs.Range(func(key string, value pcommon.Value) bool { + val := value.AsString() + exemplarRunes += utf8.RuneCountInString(key) + utf8.RuneCountInString(val) + promLabel := mimirpb.LabelAdapter{ + Name: key, + Value: val, + } + + labelsFromAttributes = append(labelsFromAttributes, promLabel) + + return true + }) + if exemplarRunes <= maxExemplarRunes { + // only append filtered attributes if it does not cause exemplar + // labels to exceed the max number of runes + promExemplar.Labels = append(promExemplar.Labels, labelsFromAttributes...) + } + + promExemplars = append(promExemplars, promExemplar) + } + + return promExemplars +} + +// mostRecentTimestampInMetric returns the latest timestamp in a batch of metrics +func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { + var ts pcommon.Timestamp + // handle individual metric based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = max(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = max(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = max(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = max(ts, dataPoints.At(x).Timestamp()) + } + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + for x := 0; x < dataPoints.Len(); x++ { + ts = max(ts, dataPoints.At(x).Timestamp()) + } + } + return ts +} + +func (c *MimirConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, + settings Settings, baseName string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + timestamp := convertTimeStamp(pt.Timestamp()) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + + // treat sum as a sample in an individual TimeSeries + sum := &mimirpb.Sample{ + Value: pt.Sum(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + sum.Value = math.Float64frombits(value.StaleNaN) + } + // sum and count of the summary should append suffix to baseName + sumlabels := createLabels(baseName+sumStr, baseLabels) + c.addSample(sum, sumlabels) + + // treat count as a sample in an individual TimeSeries + count := &mimirpb.Sample{ + Value: float64(pt.Count()), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + count.Value = math.Float64frombits(value.StaleNaN) + } + countlabels := createLabels(baseName+countStr, baseLabels) + c.addSample(count, countlabels) + + // process each percentile/quantile + for i := 0; i < pt.QuantileValues().Len(); i++ { + qt := pt.QuantileValues().At(i) + quantile := &mimirpb.Sample{ + Value: qt.Value(), + TimestampMs: timestamp, + } + if pt.Flags().NoRecordedValue() { + quantile.Value = math.Float64frombits(value.StaleNaN) + } + percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) + qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr) + c.addSample(quantile, qtlabels) + } + + startTimestamp := pt.StartTimestamp() + if settings.ExportCreatedMetric && startTimestamp != 0 { + createdLabels := createLabels(baseName+createdSuffix, baseLabels) + c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + } + } +} + +// createLabels returns a copy of baseLabels, adding to it the pair model.MetricNameLabel=name. +// If extras are provided, corresponding label pairs are also added to the returned slice. +// If extras is uneven length, the last (unpaired) extra will be ignored. +func createLabels(name string, baseLabels []mimirpb.LabelAdapter, extras ...string) []mimirpb.LabelAdapter { + extraLabelCount := len(extras) / 2 + labels := make([]mimirpb.LabelAdapter, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name + copy(labels, baseLabels) + + n := len(extras) + n -= n % 2 + for extrasIdx := 0; extrasIdx < n; extrasIdx += 2 { + labels = append(labels, mimirpb.LabelAdapter{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) + } + + labels = append(labels, mimirpb.LabelAdapter{Name: model.MetricNameLabel, Value: name}) + return labels +} + +// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. +// Otherwise it creates a new one and returns that, and true. +func (c *MimirConverter) getOrCreateTimeSeries(lbls []mimirpb.LabelAdapter) (*mimirpb.TimeSeries, bool) { + h := timeSeriesSignature(lbls) + ts := c.unique[h] + if ts != nil { + if isSameMetric(ts, lbls) { + // We already have this metric + return ts, false + } + + // Look for a matching conflict + for _, cTS := range c.conflicts[h] { + if isSameMetric(cTS, lbls) { + // We already have this metric + return cTS, false + } + } + + // New conflict + ts = &mimirpb.TimeSeries{ + Labels: lbls, + } + c.conflicts[h] = append(c.conflicts[h], ts) + return ts, true + } + + // This metric is new + ts = &mimirpb.TimeSeries{ + Labels: lbls, + } + c.unique[h] = ts + return ts, true +} + +// addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist. +// If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp, +// both converted to milliseconds. +func (c *MimirConverter) addTimeSeriesIfNeeded(lbls []mimirpb.LabelAdapter, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) { + ts, created := c.getOrCreateTimeSeries(lbls) + if created { + ts.Samples = []mimirpb.Sample{ + { + // convert ns to ms + Value: float64(convertTimeStamp(startTimestamp)), + TimestampMs: convertTimeStamp(timestamp), + }, + } + } +} + +// addResourceTargetInfo converts the resource to the target info metric. +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *MimirConverter) { + if settings.DisableTargetInfo || timestamp == 0 { + return + } + + attributes := resource.Attributes() + identifyingAttrs := []string{ + conventions.AttributeServiceNamespace, + conventions.AttributeServiceName, + conventions.AttributeServiceInstanceID, + } + nonIdentifyingAttrsCount := attributes.Len() + for _, a := range identifyingAttrs { + _, haveAttr := attributes.Get(a) + if haveAttr { + nonIdentifyingAttrsCount-- + } + } + if nonIdentifyingAttrsCount == 0 { + // If we only have job + instance, then target_info isn't useful, so don't add it. + return + } + + name := targetMetricName + if len(settings.Namespace) > 0 { + name = settings.Namespace + "_" + name + } + + labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) + haveIdentifier := false + for _, l := range labels { + if l.Name == model.JobLabel || l.Name == model.InstanceLabel { + haveIdentifier = true + break + } + } + + if !haveIdentifier { + // We need at least one identifying label to generate target_info. + return + } + + sample := &mimirpb.Sample{ + Value: float64(1), + // convert ns to ms + TimestampMs: convertTimeStamp(timestamp), + } + converter.addSample(sample, labels) +} + +// convertTimeStamp converts OTLP timestamp in ns to timestamp in ms +func convertTimeStamp(timestamp pcommon.Timestamp) int64 { + return timestamp.AsTime().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +} diff --git a/pkg/distributor/otlp/histograms_generated.go b/pkg/distributor/otlp/histograms_generated.go new file mode 100644 index 00000000000..d8aa6573ecf --- /dev/null +++ b/pkg/distributor/otlp/histograms_generated.go @@ -0,0 +1,213 @@ +// Code generated from Prometheus sources - DO NOT EDIT. + +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/histograms.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package otlp + +import ( + "fmt" + "math" + + "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/grafana/mimir/pkg/mimirpb" + + "github.com/prometheus/prometheus/model/value" +) + +const defaultZeroThreshold = 1e-128 + +func (c *MimirConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, + resource pcommon.Resource, settings Settings, baseName string) error { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + lbls := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + baseName, + ) + ts, _ := c.getOrCreateTimeSeries(lbls) + + histogram, err := exponentialToNativeHistogram(pt) + if err != nil { + return err + } + ts.Histograms = append(ts.Histograms, histogram) + + exemplars := getPromExemplars[pmetric.ExponentialHistogramDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + return nil +} + +// exponentialToNativeHistogram translates OTel Exponential Histogram data point +// to Prometheus Native Histogram. +func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint) (mimirpb.Histogram, error) { + scale := p.Scale() + if scale < -4 { + return mimirpb.Histogram{}, + fmt.Errorf("cannot convert exponential to native histogram."+ + " Scale must be >= -4, was %d", scale) + } + + var scaleDown int32 + if scale > 8 { + scaleDown = scale - 8 + scale = 8 + } + + pSpans, pDeltas := convertBucketsLayout(p.Positive(), scaleDown) + nSpans, nDeltas := convertBucketsLayout(p.Negative(), scaleDown) + + h := mimirpb.Histogram{ + // The counter reset detection must be compatible with Prometheus to + // safely set ResetHint to NO. This is not ensured currently. + // Sending a sample that triggers counter reset but with ResetHint==NO + // would lead to Prometheus panic as it does not double check the hint. + // Thus we're explicitly saying UNKNOWN here, which is always safe. + // TODO: using created time stamp should be accurate, but we + // need to know here if it was used for the detection. + // Ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28663#issuecomment-1810577303 + // Counter reset detection in Prometheus: https://github.com/prometheus/prometheus/blob/f997c72f294c0f18ca13fa06d51889af04135195/tsdb/chunkenc/histogram.go#L232 + ResetHint: mimirpb.Histogram_UNKNOWN, + Schema: scale, + + ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: p.ZeroCount()}, + // TODO use zero_threshold, if set, see + // https://github.com/open-telemetry/opentelemetry-proto/pull/441 + ZeroThreshold: defaultZeroThreshold, + + PositiveSpans: pSpans, + PositiveDeltas: pDeltas, + NegativeSpans: nSpans, + NegativeDeltas: nDeltas, + + Timestamp: convertTimeStamp(p.Timestamp()), + } + + if p.Flags().NoRecordedValue() { + h.Sum = math.Float64frombits(value.StaleNaN) + h.Count = &mimirpb.Histogram_CountInt{CountInt: value.StaleNaN} + } else { + if p.HasSum() { + h.Sum = p.Sum() + } + h.Count = &mimirpb.Histogram_CountInt{CountInt: p.Count()} + } + return h, nil +} + +// convertBucketsLayout translates OTel Exponential Histogram dense buckets +// representation to Prometheus Native Histogram sparse bucket representation. +// +// The translation logic is taken from the client_golang `histogram.go#makeBuckets` +// function, see `makeBuckets` https://github.com/prometheus/client_golang/blob/main/prometheus/histogram.go +// The bucket indexes conversion was adjusted, since OTel exp. histogram bucket +// index 0 corresponds to the range (1, base] while Prometheus bucket index 0 +// to the range (base 1]. +// +// scaleDown is the factor by which the buckets are scaled down. In other words 2^scaleDown buckets will be merged into one. +func convertBucketsLayout(buckets pmetric.ExponentialHistogramDataPointBuckets, scaleDown int32) ([]mimirpb.BucketSpan, []int64) { + bucketCounts := buckets.BucketCounts() + if bucketCounts.Len() == 0 { + return nil, nil + } + + var ( + spans []mimirpb.BucketSpan + deltas []int64 + count int64 + prevCount int64 + ) + + appendDelta := func(count int64) { + spans[len(spans)-1].Length++ + deltas = append(deltas, count-prevCount) + prevCount = count + } + + // Let the compiler figure out that this is const during this function by + // moving it into a local variable. + numBuckets := bucketCounts.Len() + + // The offset is scaled and adjusted by 1 as described above. + bucketIdx := buckets.Offset()>>scaleDown + 1 + spans = append(spans, mimirpb.BucketSpan{ + Offset: bucketIdx, + Length: 0, + }) + + for i := 0; i < numBuckets; i++ { + // The offset is scaled and adjusted by 1 as described above. + nextBucketIdx := (int32(i)+buckets.Offset())>>scaleDown + 1 + if bucketIdx == nextBucketIdx { // We have not collected enough buckets to merge yet. + count += int64(bucketCounts.At(i)) + continue + } + if count == 0 { + count = int64(bucketCounts.At(i)) + continue + } + + gap := nextBucketIdx - bucketIdx - 1 + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, mimirpb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } + } + appendDelta(count) + count = int64(bucketCounts.At(i)) + bucketIdx = nextBucketIdx + } + // Need to use the last item's index. The offset is scaled and adjusted by 1 as described above. + gap := (int32(numBuckets)+buckets.Offset()-1)>>scaleDown + 1 - bucketIdx + if gap > 2 { + // We have to create a new span, because we have found a gap + // of more than two buckets. The constant 2 is copied from the logic in + // https://github.com/prometheus/client_golang/blob/27f0506d6ebbb117b6b697d0552ee5be2502c5f2/prometheus/histogram.go#L1296 + spans = append(spans, mimirpb.BucketSpan{ + Offset: gap, + Length: 0, + }) + } else { + // We have found a small gap (or no gap at all). + // Insert empty buckets as needed. + for j := int32(0); j < gap; j++ { + appendDelta(0) + } + } + appendDelta(count) + + return spans, deltas +} diff --git a/pkg/distributor/otlp/metrics_to_prw_generated.go b/pkg/distributor/otlp/metrics_to_prw_generated.go new file mode 100644 index 00000000000..2cae2bf5287 --- /dev/null +++ b/pkg/distributor/otlp/metrics_to_prw_generated.go @@ -0,0 +1,184 @@ +// Code generated from Prometheus sources - DO NOT EDIT. + +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package otlp + +import ( + "errors" + "fmt" + "sort" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +type Settings struct { + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + ExportCreatedMetric bool + AddMetricSuffixes bool + SendMetadata bool +} + +// MimirConverter converts from OTel write format to Mimir remote write format. +type MimirConverter struct { + unique map[uint64]*mimirpb.TimeSeries + conflicts map[uint64][]*mimirpb.TimeSeries +} + +func NewMimirConverter() *MimirConverter { + return &MimirConverter{ + unique: map[uint64]*mimirpb.TimeSeries{}, + conflicts: map[uint64][]*mimirpb.TimeSeries{}, + } +} + +// FromMetrics converts pmetric.Metrics to Mimir remote write format. +func (c *MimirConverter) FromMetrics(md pmetric.Metrics, settings Settings) (errs error) { + resourceMetricsSlice := md.ResourceMetrics() + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + resource := resourceMetrics.Resource() + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + // keep track of the most recent timestamp in the ResourceMetrics for + // use with the "target" info metric + var mostRecentTimestamp pcommon.Timestamp + for j := 0; j < scopeMetricsSlice.Len(); j++ { + metricSlice := scopeMetricsSlice.At(j).Metrics() + + // TODO: decide if instrumentation library information should be exported as labels + for k := 0; k < metricSlice.Len(); k++ { + metric := metricSlice.At(k) + mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric)) + + if !isValidAggregationTemporality(metric) { + errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name())) + continue + } + + promName := prometheustranslator.BuildCompliantName(metric, settings.Namespace, settings.AddMetricSuffixes) + + // handle individual metrics based on type + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dataPoints := metric.Gauge().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + c.addGaugeNumberDataPoints(dataPoints, resource, settings, promName) + case pmetric.MetricTypeSum: + dataPoints := metric.Sum().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + c.addSumNumberDataPoints(dataPoints, resource, metric, settings, promName) + case pmetric.MetricTypeHistogram: + dataPoints := metric.Histogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + c.addHistogramDataPoints(dataPoints, resource, settings, promName) + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + errs = multierr.Append(errs, c.addExponentialHistogramDataPoints( + dataPoints, + resource, + settings, + promName, + )) + case pmetric.MetricTypeSummary: + dataPoints := metric.Summary().DataPoints() + if dataPoints.Len() == 0 { + errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) + break + } + c.addSummaryDataPoints(dataPoints, resource, settings, promName) + default: + errs = multierr.Append(errs, errors.New("unsupported metric type")) + } + } + } + addResourceTargetInfo(resource, settings, mostRecentTimestamp, c) + } + + return +} + +func isSameMetric(ts *mimirpb.TimeSeries, lbls []mimirpb.LabelAdapter) bool { + if len(ts.Labels) != len(lbls) { + return false + } + for i, l := range ts.Labels { + if l.Name != ts.Labels[i].Name || l.Value != ts.Labels[i].Value { + return false + } + } + return true +} + +// addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, +// the exemplar is added to the bucket bound's time series, provided that the time series' has samples. +func (c *MimirConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { + if len(bucketBounds) == 0 { + return + } + + exemplars := getPromExemplars(dataPoint) + if len(exemplars) == 0 { + return + } + + sort.Sort(byBucketBoundsData(bucketBounds)) + for _, exemplar := range exemplars { + for _, bound := range bucketBounds { + if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound { + bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar) + break + } + } + } +} + +// addSample finds a TimeSeries that corresponds to lbls, and adds sample to it. +// If there is no corresponding TimeSeries already, it's created. +// The corresponding TimeSeries is returned. +// If either lbls is nil/empty or sample is nil, nothing is done. +func (c *MimirConverter) addSample(sample *mimirpb.Sample, lbls []mimirpb.LabelAdapter) *mimirpb.TimeSeries { + if sample == nil || len(lbls) == 0 { + // This shouldn't happen + return nil + } + + ts, _ := c.getOrCreateTimeSeries(lbls) + ts.Samples = append(ts.Samples, *sample) + return ts +} diff --git a/pkg/distributor/otlp/mimirpb.patch b/pkg/distributor/otlp/mimirpb.patch new file mode 100644 index 00000000000..3a5bbc3353f --- /dev/null +++ b/pkg/distributor/otlp/mimirpb.patch @@ -0,0 +1,106 @@ +@@ +@@ +-[]prompb.Label ++[]mimirpb.LabelAdapter + +@@ +@@ +-prompb.Label ++mimirpb.LabelAdapter + +@@ +var x expression +@@ +-make([]prompb.Label, x) ++make([]mimirpb.LabelAdapter, x) + +@@ +@@ +-prompb.MetricMetadata_GAUGE ++mimirpb.GAUGE + +@@ +@@ +-prompb.MetricMetadata_COUNTER ++mimirpb.COUNTER + +@@ +@@ +-prompb.MetricMetadata_HISTOGRAM ++mimirpb.HISTOGRAM + +@@ +@@ +-prompb.MetricMetadata_SUMMARY ++mimirpb.SUMMARY + +@@ +@@ +-prompb.MetricMetadata_UNKNOWN ++mimirpb.UNKNOWN + +@@ +var x identifier +@@ +-prompb.x ++mimirpb.x + +@@ +var x expression +@@ +mimirpb.Sample{ + ..., +- Timestamp: x, ++ TimestampMs: x, + ..., +} + +@@ +var x expression +@@ +[]mimirpb.Sample{ + { + ..., +- Timestamp: x, ++ TimestampMs: x, + ..., + }, +} + +@@ +var x expression +@@ +mimirpb.Exemplar{ + ..., +- Timestamp: x, ++ TimestampMs: x, + ..., +} + +@@ +@@ +-PrometheusConverter ++MimirConverter + +@@ +@@ +-NewPrometheusConverter ++NewMimirConverter + +@@ +var x identifier +@@ +-package prometheusremotewrite ++package otlp + +# Due to a limitation of package modification, you need an additional patch directive +x + +@@ +var x identifier +@@ +-import "github.com/prometheus/prometheus/prompb" ++import "github.com/grafana/mimir/pkg/mimirpb" + +# Due to a limitation of imports modification, you need an additional patch directive +x diff --git a/pkg/distributor/otlp/number_data_points_generated.go b/pkg/distributor/otlp/number_data_points_generated.go new file mode 100644 index 00000000000..352639b3d59 --- /dev/null +++ b/pkg/distributor/otlp/number_data_points_generated.go @@ -0,0 +1,113 @@ +// Code generated from Prometheus sources - DO NOT EDIT. + +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/number_data_points.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package otlp + +import ( + "math" + + "github.com/prometheus/common/model" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/grafana/mimir/pkg/mimirpb" + + "github.com/prometheus/prometheus/model/value" +) + +func (c *MimirConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, settings Settings, name string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + labels := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + name, + ) + sample := &mimirpb.Sample{ + // convert ns to ms + TimestampMs: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + c.addSample(sample, labels) + } +} + +func (c *MimirConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, + resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) { + for x := 0; x < dataPoints.Len(); x++ { + pt := dataPoints.At(x) + lbls := createAttributes( + resource, + pt.Attributes(), + settings.ExternalLabels, + nil, + true, + model.MetricNameLabel, + name, + ) + sample := &mimirpb.Sample{ + // convert ns to ms + TimestampMs: convertTimeStamp(pt.Timestamp()), + } + switch pt.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + sample.Value = float64(pt.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + sample.Value = pt.DoubleValue() + } + if pt.Flags().NoRecordedValue() { + sample.Value = math.Float64frombits(value.StaleNaN) + } + ts := c.addSample(sample, lbls) + if ts != nil { + exemplars := getPromExemplars[pmetric.NumberDataPoint](pt) + ts.Exemplars = append(ts.Exemplars, exemplars...) + } + + // add created time series if needed + if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { + startTimestamp := pt.StartTimestamp() + if startTimestamp == 0 { + return + } + + createdLabels := make([]mimirpb.LabelAdapter, len(lbls)) + copy(createdLabels, lbls) + for i, l := range createdLabels { + if l.Name == model.MetricNameLabel { + createdLabels[i].Value = name + createdSuffix + break + } + } + c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + } + } +} diff --git a/pkg/distributor/otlp/otlp_to_openmetrics_metadata_generated.go b/pkg/distributor/otlp/otlp_to_openmetrics_metadata_generated.go new file mode 100644 index 00000000000..e9d779b6ab2 --- /dev/null +++ b/pkg/distributor/otlp/otlp_to_openmetrics_metadata_generated.go @@ -0,0 +1,80 @@ +// Code generated from Prometheus sources - DO NOT EDIT. + +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/otlp_to_openmetrics_metadata.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: Copyright The OpenTelemetry Authors. + +package otlp + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) mimirpb.MetricMetadata_MetricType { + switch otelMetric.Type() { + case pmetric.MetricTypeGauge: + return mimirpb.GAUGE + case pmetric.MetricTypeSum: + metricType := mimirpb.GAUGE + if otelMetric.Sum().IsMonotonic() { + metricType = mimirpb.COUNTER + } + return metricType + case pmetric.MetricTypeHistogram: + return mimirpb.HISTOGRAM + case pmetric.MetricTypeSummary: + return mimirpb.SUMMARY + case pmetric.MetricTypeExponentialHistogram: + return mimirpb.HISTOGRAM + } + return mimirpb.UNKNOWN +} + +func OtelMetricsToMetadata(md pmetric.Metrics, addMetricSuffixes bool) []*mimirpb.MetricMetadata { + resourceMetricsSlice := md.ResourceMetrics() + + metadataLength := 0 + for i := 0; i < resourceMetricsSlice.Len(); i++ { + scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics() + for j := 0; j < scopeMetricsSlice.Len(); j++ { + metadataLength += scopeMetricsSlice.At(j).Metrics().Len() + } + } + + var metadata = make([]*mimirpb.MetricMetadata, 0, metadataLength) + for i := 0; i < resourceMetricsSlice.Len(); i++ { + resourceMetrics := resourceMetricsSlice.At(i) + scopeMetricsSlice := resourceMetrics.ScopeMetrics() + + for j := 0; j < scopeMetricsSlice.Len(); j++ { + scopeMetrics := scopeMetricsSlice.At(j) + for k := 0; k < scopeMetrics.Metrics().Len(); k++ { + metric := scopeMetrics.Metrics().At(k) + entry := mimirpb.MetricMetadata{ + Type: otelMetricTypeToPromMetricType(metric), + MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", addMetricSuffixes), + Help: metric.Description(), + } + metadata = append(metadata, &entry) + } + } + } + + return metadata +} diff --git a/pkg/distributor/otlp/timeseries.go b/pkg/distributor/otlp/timeseries.go new file mode 100644 index 00000000000..6834af3d336 --- /dev/null +++ b/pkg/distributor/otlp/timeseries.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +//go:generate ./generate.sh + +package otlp + +import ( + // Ensure that prometheusremotewrite sources are vendored for generator script. + _ "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + + "github.com/grafana/mimir/pkg/mimirpb" +) + +// TimeSeries returns a slice of the mimirpb.TimeSeries that were converted from OTel format. +func (c *MimirConverter) TimeSeries() []mimirpb.PreallocTimeseries { + totalCount := len(c.unique) + for _, ts := range c.conflicts { + totalCount += len(ts) + } + + allTS := mimirpb.PreallocTimeseriesSliceFromPool()[:0] + if cap(allTS) < totalCount { + allTS = make([]mimirpb.PreallocTimeseries, 0, totalCount) + } + for _, ts := range c.unique { + allTS = append(allTS, mimirpb.PreallocTimeseries{TimeSeries: ts}) + } + for _, cTS := range c.conflicts { + for _, ts := range cTS { + allTS = append(allTS, mimirpb.PreallocTimeseries{TimeSeries: ts}) + } + } + + return allTS +} diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index f67cd8f6adb..0c9a864f34f 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -300,7 +300,7 @@ func TestHandlerOTLPPush(t *testing.T) { t.Cleanup(pushReq.CleanUp) return tt.verifyFunc(t, pushReq) } - handler := OTLPHandler(tt.maxMsgSize, nil, nil, false, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger()) + handler := OTLPHandler(tt.maxMsgSize, nil, nil, false, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger(), true) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) @@ -368,7 +368,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) { assert.False(t, request.SkipLabelNameValidation) pushReq.CleanUp() return nil - }, nil, nil, log.NewNopLogger()) + }, nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -414,7 +414,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { assert.Len(t, request.Timeseries, 1) assert.False(t, request.SkipLabelNameValidation) return nil - }, nil, nil, log.NewNopLogger()) + }, nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) @@ -440,7 +440,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) { assert.Len(t, request.Timeseries, 9) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case assert.False(t, request.SkipLabelNameValidation) return nil - }, nil, nil, log.NewNopLogger()) + }, nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -461,7 +461,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) { resp := httptest.NewRecorder() - handler := OTLPHandler(140, nil, nil, false, true, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger()) + handler := OTLPHandler(140, nil, nil, false, true, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger(), true) handler.ServeHTTP(resp, req) assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code) body, err := io.ReadAll(resp.Body) diff --git a/tools/add-license/main.go b/tools/add-license/main.go index c1f9cbd7935..d3226d1a4c3 100644 --- a/tools/add-license/main.go +++ b/tools/add-license/main.go @@ -65,8 +65,8 @@ func addLicense(dir string) error { return nil } - // Skip generated protos. - if strings.HasSuffix(path, ".pb.go") { + // Skip generated Go files. + if strings.HasSuffix(path, ".pb.go") || strings.HasSuffix(path, "_generated.go") { return nil }