diff --git a/extensions-contrib/opentelemetry-extensions/README.md b/extensions-contrib/opentelemetry-extensions/README.md new file mode 100644 index 000000000000..9bb7591224ea --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/README.md @@ -0,0 +1,49 @@ + + +# OpenTelemetry Extensions + +The [OpenTelemetry](https://opentelemetry.io/) extensions provides the ability to read metrics in OTLP format + +## Configuration + +### How to enable and use the extension + +To enable the OpenTelemetry extensions, add the extension and enable the emitter in `common.runtime.properties`. + +Load the plugin: + +```properties +druid.extensions.loadList=[..., "druid-opentelemetry-extensions"] +``` + +Now Sumbit the Supervisor Config with [source input format](https://druid.apache.org/docs/latest/ingestion/data-formats/) as: + +``` +"ioConfig": { + "topic": "topic_name", + "inputFormat": { + "type": "opentelemetry-metrics-protobuf", + "metricDimension": "name" + } + . + . + . +} +``` diff --git a/extensions-contrib/opentelemetry-extensions/pom.xml b/extensions-contrib/opentelemetry-extensions/pom.xml new file mode 100644 index 000000000000..c868ea12003e --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/pom.xml @@ -0,0 +1,102 @@ + + + + 4.0.0 + + org.apache.druid.extensions.contrib + druid-opentelemetry-extensions + druid-opentelemetry-extensions + druid-opentelemetry-extensions + + + druid + org.apache.druid + 33.0.0-SNAPSHOT + ../../pom.xml + + + + com.google.protobuf + protobuf-java + + + io.opentelemetry.proto + opentelemetry-proto + + + com.google.guava + guava + provided + + + com.google.inject + guice + provided + + + com.google.code.findbugs + jsr305 + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + com.fasterxml.jackson.core + jackson-databind + provided + + + org.apache.druid + druid-processing + ${project.parent.version} + provided + + + org.apache.druid + druid-indexing-service + ${project.parent.version} + provided + + + + junit + junit + test + + + + org.openjdk.jmh + jmh-core + 1.27 + test + + + org.openjdk.jmh + jmh-generator-annprocess + 1.27 + test + + + diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java new file mode 100644 index 000000000000..50029e8dfbd9 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufInputFormat.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.StringUtils; + +import java.io.File; +import java.util.Objects; + +public class OpenTelemetryMetricsProtobufInputFormat implements InputFormat +{ + private static final String DEFAULT_METRIC_DIMENSION = "metric"; + private static final String DEFAULT_VALUE_DIMENSION = "value"; + private static final String DEFAULT_RESOURCE_PREFIX = "resource."; + + private final String metricDimension; + private final String valueDimension; + private final String metricAttributePrefix; + private final String resourceAttributePrefix; + + public OpenTelemetryMetricsProtobufInputFormat( + @JsonProperty("metricDimension") String metricDimension, + @JsonProperty("valueDimension") String valueDimension, + @JsonProperty("metricAttributePrefix") String metricAttributePrefix, + @JsonProperty("resourceAttributePrefix") String resourceAttributePrefix + ) + { + this.metricDimension = metricDimension != null ? metricDimension : DEFAULT_METRIC_DIMENSION; + this.valueDimension = valueDimension != null ? valueDimension : DEFAULT_VALUE_DIMENSION; + this.metricAttributePrefix = StringUtils.nullToEmptyNonDruidDataString(metricAttributePrefix); + this.resourceAttributePrefix = resourceAttributePrefix != null ? resourceAttributePrefix : DEFAULT_RESOURCE_PREFIX; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + // Sampler passes a KafkaRecordEntity directly, while the normal code path wraps the same entity in a + // SettableByteEntity + SettableByteEntity settableEntity; + if (source instanceof SettableByteEntity) { + settableEntity = (SettableByteEntity) source; + } else { + SettableByteEntity wrapper = new SettableByteEntity<>(); + wrapper.setEntity((ByteEntity) source); + settableEntity = wrapper; + } + return new OpenTelemetryMetricsProtobufReader( + inputRowSchema.getDimensionsSpec(), + settableEntity, + metricDimension, + valueDimension, + metricAttributePrefix, + resourceAttributePrefix + ); + } + + @JsonProperty + public String getMetricDimension() + { + return metricDimension; + } + + @JsonProperty + public String getValueDimension() + { + return valueDimension; + } + + @JsonProperty + public String getMetricAttributePrefix() + { + return metricAttributePrefix; + } + + @JsonProperty + public String getResourceAttributePrefix() + { + return resourceAttributePrefix; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof OpenTelemetryMetricsProtobufInputFormat)) { + return false; + } + OpenTelemetryMetricsProtobufInputFormat that = (OpenTelemetryMetricsProtobufInputFormat) o; + return Objects.equals(metricDimension, that.metricDimension) + && Objects.equals(valueDimension, that.valueDimension) + && Objects.equals(metricAttributePrefix, that.metricAttributePrefix) + && Objects.equals(resourceAttributePrefix, that.resourceAttributePrefix); + } + + @Override + public int hashCode() + { + return Objects.hash(metricDimension, valueDimension, metricAttributePrefix, resourceAttributePrefix); + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java new file mode 100644 index 000000000000..af118b109be5 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.metrics.v1.DataPointFlags; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class OpenTelemetryMetricsProtobufReader implements InputEntityReader +{ + private static final Logger log = new Logger(OpenTelemetryMetricsProtobufReader.class); + + private final SettableByteEntity source; + private final String metricDimension; + private final String valueDimension; + private final String metricAttributePrefix; + private final String resourceAttributePrefix; + private final DimensionsSpec dimensionsSpec; + + public OpenTelemetryMetricsProtobufReader( + DimensionsSpec dimensionsSpec, + SettableByteEntity source, + String metricDimension, + String valueDimension, + String metricAttributePrefix, + String resourceAttributePrefix + ) + { + this.dimensionsSpec = dimensionsSpec; + this.source = source; + this.metricDimension = metricDimension; + this.valueDimension = valueDimension; + this.metricAttributePrefix = metricAttributePrefix; + this.resourceAttributePrefix = resourceAttributePrefix; + } + + @Override + public CloseableIterator read() + { + Supplier> supplier = Suppliers.memoize(() -> readAsList().iterator()); + return CloseableIterators.withEmptyBaggage(new Iterator() { + @Override + public boolean hasNext() + { + return supplier.get().hasNext(); + } + @Override + public InputRow next() + { + return supplier.get().next(); + } + }); + } + + List readAsList() + { + ByteBuffer buffer = source.getEntity().getBuffer(); + try { + return parseMetricsData(MetricsData.parseFrom(buffer)); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(null, e, "Protobuf message could not be parsed"); + } + finally { + // Explicitly move the position assuming that all the remaining bytes have been consumed because the protobuf + // parser does not update the position itself + // In case of an exception, the buffer is moved to the end to avoid parsing it in a loop. + buffer.position(buffer.limit()); + } + } + + private List parseMetricsData(final MetricsData metricsData) + { + return metricsData.getResourceMetricsList() + .stream() + .flatMap(resourceMetrics -> { + Map resourceAttributes = resourceMetrics.getResource() + .getAttributesList() + .stream() + .collect(HashMap::new, + (m, kv) -> { + Object value = parseAnyValue(kv.getValue()); + if (value != null) { + m.put(resourceAttributePrefix + kv.getKey(), value); + } + }, + HashMap::putAll); + return resourceMetrics.getScopeMetricsList() + .stream() + .flatMap(scopeMetrics -> scopeMetrics.getMetricsList() + .stream() + .flatMap(metric -> parseMetric(metric, resourceAttributes).stream())); + }) + .collect(Collectors.toList()); + } + + private List parseMetric(Metric metric, Map resourceAttributes) + { + final List inputRows; + String metricName = metric.getName(); + switch (metric.getDataCase()) { + case SUM: { + inputRows = new ArrayList<>(metric.getSum().getDataPointsCount()); + metric.getSum() + .getDataPointsList() + .forEach(dataPoint -> { + if (hasRecordedValue(dataPoint)) { + inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName)); + } + }); + break; + } + case GAUGE: { + inputRows = new ArrayList<>(metric.getGauge().getDataPointsCount()); + metric.getGauge() + .getDataPointsList() + .forEach(dataPoint -> { + if (hasRecordedValue(dataPoint)) { + inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName)); + } + }); + break; + } + // TODO Support HISTOGRAM and SUMMARY metrics + case HISTOGRAM: + case SUMMARY: + default: + log.trace("Metric type %s is not supported", metric.getDataCase()); + inputRows = Collections.emptyList(); + + } + return inputRows; + } + + private static boolean hasRecordedValue(NumberDataPoint d) + { + return (d.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) == 0; + } + + private InputRow parseNumberDataPoint(NumberDataPoint dataPoint, + Map resourceAttributes, + String metricName) + { + + int capacity = resourceAttributes.size() + + dataPoint.getAttributesCount() + + 2; // metric name + value columns + Map event = Maps.newHashMapWithExpectedSize(capacity); + event.put(metricDimension, metricName); + + if (dataPoint.hasAsInt()) { + event.put(valueDimension, dataPoint.getAsInt()); + } else { + event.put(valueDimension, dataPoint.getAsDouble()); + } + + event.putAll(resourceAttributes); + dataPoint.getAttributesList().forEach(att -> { + Object value = parseAnyValue(att.getValue()); + if (value != null) { + event.put(metricAttributePrefix + att.getKey(), value); + } + }); + + return createRow(TimeUnit.NANOSECONDS.toMillis(dataPoint.getTimeUnixNano()), event); + } + + @Nullable + private static Object parseAnyValue(AnyValue value) + { + switch (value.getValueCase()) { + case INT_VALUE: + return value.getIntValue(); + case BOOL_VALUE: + return value.getBoolValue(); + case DOUBLE_VALUE: + return value.getDoubleValue(); + case STRING_VALUE: + return value.getStringValue(); + + // TODO: Support KVLIST_VALUE, ARRAY_VALUE and BYTES_VALUE + + default: + // VALUE_NOT_SET + return null; + } + } + + InputRow createRow(long timeUnixMilli, Map event) + { + final List dimensions; + if (!dimensionsSpec.getDimensionNames().isEmpty()) { + dimensions = dimensionsSpec.getDimensionNames(); + } else { + dimensions = new ArrayList<>(Sets.difference(event.keySet(), dimensionsSpec.getDimensionExclusions())); + } + return new MapBasedInputRow(timeUnixMilli, dimensions, event); + } + + @Override + public CloseableIterator sample() + { + return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java new file mode 100644 index 000000000000..4c027c31248c --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryProtobufExtensionsModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class OpenTelemetryProtobufExtensionsModule implements DruidModule +{ + + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("OpenTelemetryProtobufInputFormat") + .registerSubtypes( + new NamedType(OpenTelemetryMetricsProtobufInputFormat.class, "opentelemetry-metrics-protobuf") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100755 index 000000000000..b2a7d04bb635 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryProtobufExtensionsModule + diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java new file mode 100644 index 000000000000..0238aeccafa5 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryBenchmark.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.resource.v1.Resource; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +@Fork(1) +@State(Scope.Benchmark) +public class OpenTelemetryBenchmark +{ + + private static ByteBuffer BUFFER; + + @Param(value = {"1", "2", "4", "8" }) + private int resourceMetricCount = 1; + + @Param(value = {"1"}) + private int instrumentationScopeCount = 1; + + @Param(value = {"1", "2", "4", "8" }) + private int metricsCount = 1; + + @Param(value = {"1", "2", "4", "8" }) + private int dataPointCount; + + private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli()); + + private static final InputRowSchema ROW_SCHEMA = new InputRowSchema(null, + new DimensionsSpec(ImmutableList.of( + new StringDimensionSchema("name"), + new StringDimensionSchema("value"), + new StringDimensionSchema("foo_key"))), + null); + + private static final OpenTelemetryMetricsProtobufInputFormat INPUT_FORMAT = + new OpenTelemetryMetricsProtobufInputFormat("name", + "value", + "", + "resource."); + + private ByteBuffer createMetricBuffer() + { + MetricsData.Builder metricsData = MetricsData.newBuilder(); + for (int i = 0; i < resourceMetricCount; i++) { + ResourceMetrics.Builder resourceMetricsBuilder = metricsData.addResourceMetricsBuilder(); + Resource.Builder resourceBuilder = resourceMetricsBuilder.getResourceBuilder(); + + for (int resourceAttributeI = 0; resourceAttributeI < 5; resourceAttributeI++) { + KeyValue.Builder resourceAttributeBuilder = resourceBuilder.addAttributesBuilder(); + resourceAttributeBuilder.setKey("resource.label_key_" + resourceAttributeI); + resourceAttributeBuilder.setValue(AnyValue.newBuilder().setStringValue("resource.label_value")); + } + + for (int j = 0; j < instrumentationScopeCount; j++) { + ScopeMetrics.Builder scopeMetricsBuilder = + resourceMetricsBuilder.addScopeMetricsBuilder(); + + for (int k = 0; k < metricsCount; k++) { + Metric.Builder metricBuilder = scopeMetricsBuilder.addMetricsBuilder(); + metricBuilder.setName("io.confluent.domain/such/good/metric/wow"); + + for (int l = 0; l < dataPointCount; l++) { + NumberDataPoint.Builder dataPointBuilder = metricBuilder.getSumBuilder().addDataPointsBuilder(); + dataPointBuilder.setAsDouble(42.0).setTimeUnixNano(TIMESTAMP); + + for (int metricAttributeI = 0; metricAttributeI < 10; metricAttributeI++) { + KeyValue.Builder attributeBuilder = dataPointBuilder.addAttributesBuilder(); + attributeBuilder.setKey("foo_key_" + metricAttributeI); + attributeBuilder.setValue(AnyValue.newBuilder().setStringValue("foo-value")); + } + } + } + } + } + return ByteBuffer.wrap(metricsData.build().toByteArray()); + } + + @Setup + public void init() + { + BUFFER = createMetricBuffer(); + } + + @Benchmark() + public void measureSerde(Blackhole blackhole) throws IOException + { + for (CloseableIterator it = INPUT_FORMAT.createReader(ROW_SCHEMA, new ByteEntity(BUFFER), null).read(); it.hasNext(); ) { + InputRow row = it.next(); + blackhole.consume(row); + } + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java new file mode 100644 index 000000000000..536247ab5716 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsInputFormatTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.junit.Assert; +import org.junit.Test; + +public class OpenTelemetryMetricsInputFormatTest +{ + @Test + public void testSerde() throws Exception + { + OpenTelemetryMetricsProtobufInputFormat inputFormat = new OpenTelemetryMetricsProtobufInputFormat( + "metric.name", + "raw.value", + "descriptor.", + "custom." + ); + + final ObjectMapper jsonMapper = new ObjectMapper(); + jsonMapper.registerModules(new OpenTelemetryProtobufExtensionsModule().getJacksonModules()); + + final OpenTelemetryMetricsProtobufInputFormat actual = (OpenTelemetryMetricsProtobufInputFormat) jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + InputFormat.class + ); + Assert.assertEquals(inputFormat, actual); + Assert.assertEquals("metric.name", actual.getMetricDimension()); + Assert.assertEquals("raw.value", actual.getValueDimension()); + Assert.assertEquals("descriptor.", actual.getMetricAttributePrefix()); + Assert.assertEquals("custom.", actual.getResourceAttributePrefix()); + } + + @Test + public void testDefaults() + { + OpenTelemetryMetricsProtobufInputFormat inputFormat = new OpenTelemetryMetricsProtobufInputFormat( + null, + null, + null, + null + ); + + Assert.assertEquals("metric", inputFormat.getMetricDimension()); + Assert.assertEquals("value", inputFormat.getValueDimension()); + Assert.assertEquals("", inputFormat.getMetricAttributePrefix()); + Assert.assertEquals("resource.", inputFormat.getResourceAttributePrefix()); + } +} diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java new file mode 100644 index 000000000000..b5bca8bd7da4 --- /dev/null +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opentelemetry.protobuf; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import io.opentelemetry.proto.metrics.v1.DataPointFlags; +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; + +public class OpenTelemetryMetricsProtobufReaderTest +{ + private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli()); + public static final String RESOURCE_ATTRIBUTE_COUNTRY = "country"; + public static final String RESOURCE_ATTRIBUTE_VALUE_USA = "usa"; + + public static final String RESOURCE_ATTRIBUTE_ENV = "env"; + public static final String RESOURCE_ATTRIBUTE_VALUE_DEVEL = "devel"; + + public static final String INSTRUMENTATION_SCOPE_NAME = "mock-instr-lib"; + public static final String INSTRUMENTATION_SCOPE_VERSION = "1.0"; + + public static final String METRIC_ATTRIBUTE_COLOR = "color"; + public static final String METRIC_ATTRIBUTE_VALUE_RED = "red"; + + public static final String METRIC_ATTRIBUTE_FOO_KEY = "foo_key"; + public static final String METRIC_ATTRIBUTE_FOO_VAL = "foo_value"; + + private final MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); + + private final Metric.Builder metricBuilder = metricsDataBuilder.addResourceMetricsBuilder() + .addScopeMetricsBuilder() + .addMetricsBuilder(); + + private final DimensionsSpec dimensionsSpec = new DimensionsSpec(ImmutableList.of( + new StringDimensionSchema("descriptor." + METRIC_ATTRIBUTE_COLOR), + new StringDimensionSchema("descriptor." + METRIC_ATTRIBUTE_FOO_KEY), + new StringDimensionSchema("custom." + RESOURCE_ATTRIBUTE_ENV), + new StringDimensionSchema("custom." + RESOURCE_ATTRIBUTE_COUNTRY) + )); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() + { + metricsDataBuilder + .getResourceMetricsBuilder(0) + .getResourceBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey(RESOURCE_ATTRIBUTE_COUNTRY) + .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_USA))); + + metricsDataBuilder + .getResourceMetricsBuilder(0) + .getScopeMetricsBuilder(0) + .getScopeBuilder() + .setName(INSTRUMENTATION_SCOPE_NAME) + .setVersion(INSTRUMENTATION_SCOPE_VERSION); + + } + + @Test + public void testSumWithAttributes() + { + metricBuilder + .setName("example_sum") + .getSumBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + List rowList = new ArrayList<>(); + rows.forEachRemaining(rowList::add); + Assert.assertEquals(1, rowList.size()); + + InputRow row = rowList.get(0); + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_sum"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + assertDimensionEquals(row, "raw.value", "6"); + } + + @Test + public void testGaugeWithAttributes() + { + metricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertTrue(rows.hasNext()); + InputRow row = rows.next(); + + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_gauge"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + assertDimensionEquals(row, "raw.value", "6"); + } + + @Test + public void testBatchedMetricParse() + { + metricBuilder.setName("example_sum") + .getSumBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()); + + // Create Second Metric + Metric.Builder gaugeMetricBuilder = metricsDataBuilder.addResourceMetricsBuilder() + .addScopeMetricsBuilder() + .addMetricsBuilder(); + + metricsDataBuilder.getResourceMetricsBuilder(1) + .getResourceBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey(RESOURCE_ATTRIBUTE_ENV) + .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_DEVEL)) + .build()); + + metricsDataBuilder.getResourceMetricsBuilder(1) + .getScopeMetricsBuilder(0) + .getScopeBuilder() + .setName(INSTRUMENTATION_SCOPE_NAME) + .setVersion(INSTRUMENTATION_SCOPE_VERSION); + + gaugeMetricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(8) + .setTimeUnixNano(TIMESTAMP) + .addAttributesBuilder() // test sum with attributes + .setKey(METRIC_ATTRIBUTE_FOO_KEY) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_FOO_VAL).build()); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertTrue(rows.hasNext()); + InputRow row = rows.next(); + + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_sum"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + assertDimensionEquals(row, "raw.value", "6"); + + Assert.assertTrue(rows.hasNext()); + row = rows.next(); + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_gauge"); + assertDimensionEquals(row, "custom.env", "devel"); + assertDimensionEquals(row, "descriptor.foo_key", "foo_value"); + assertDimensionEquals(row, "raw.value", "8"); + + } + + @Test + public void testDimensionSpecExclusions() + { + metricsDataBuilder.getResourceMetricsBuilder(0) + .getResourceBuilder() + .addAttributesBuilder() + .setKey(RESOURCE_ATTRIBUTE_ENV) + .setValue(AnyValue.newBuilder().setStringValue(RESOURCE_ATTRIBUTE_VALUE_DEVEL).build()); + + metricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAllAttributes(ImmutableList.of( + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()).build(), + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_FOO_KEY) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_FOO_VAL).build()).build())); + + MetricsData metricsData = metricsDataBuilder.build(); + + DimensionsSpec dimensionsSpecWithExclusions = DimensionsSpec.builder().setDimensionExclusions(ImmutableList.of( + "descriptor." + METRIC_ATTRIBUTE_COLOR, + "custom." + RESOURCE_ATTRIBUTE_COUNTRY + )).build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpecWithExclusions, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertTrue(rows.hasNext()); + InputRow row = rows.next(); + + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_gauge"); + assertDimensionEquals(row, "raw.value", "6"); + assertDimensionEquals(row, "custom.env", "devel"); + assertDimensionEquals(row, "descriptor.foo_key", "foo_value"); + Assert.assertFalse(row.getDimensions().contains("custom.country")); + Assert.assertFalse(row.getDimensions().contains("descriptor.color")); + } + + @Test + public void testUnsupportedValueTypes() + { + KeyValueList kvList = KeyValueList.newBuilder() + .addValues( + KeyValue.newBuilder() + .setKey("foo") + .setValue(AnyValue.newBuilder().setStringValue("bar").build())) + .build(); + + metricsDataBuilder.getResourceMetricsBuilder(0) + .getResourceBuilder() + .addAttributesBuilder() + .setKey(RESOURCE_ATTRIBUTE_ENV) + .setValue(AnyValue.newBuilder().setKvlistValue(kvList).build()); + + metricBuilder + .setName("example_sum") + .getSumBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setTimeUnixNano(TIMESTAMP) + .addAllAttributes(ImmutableList.of( + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_COLOR) + .setValue(AnyValue.newBuilder().setStringValue(METRIC_ATTRIBUTE_VALUE_RED).build()).build(), + KeyValue.newBuilder() + .setKey(METRIC_ATTRIBUTE_FOO_KEY) + .setValue(AnyValue.newBuilder().setKvlistValue(kvList).build()).build())); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + List rowList = new ArrayList<>(); + rows.forEachRemaining(rowList::add); + Assert.assertEquals(1, rowList.size()); + + InputRow row = rowList.get(0); + Assert.assertEquals(4, row.getDimensions().size()); + assertDimensionEquals(row, "metric.name", "example_sum"); + assertDimensionEquals(row, "custom.country", "usa"); + assertDimensionEquals(row, "descriptor.color", "red"); + + // Unsupported resource attribute type is omitted + Assert.assertEquals(0, row.getDimension("custom.env").size()); + + // Unsupported metric attribute type is omitted + Assert.assertEquals(0, row.getDimension("descriptor.foo_key").size()); + + assertDimensionEquals(row, "raw.value", "6"); + } + + @Test + public void testInvalidProtobuf() + { + byte[] invalidProtobuf = new byte[] {0x00, 0x01}; + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(invalidProtobuf)); + try (CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read()) { + Assert.assertThrows(ParseException.class, () -> rows.hasNext()); + Assert.assertThrows(NoSuchElementException.class, () -> rows.next()); + } + catch (IOException e) { + // Comes from the implicit call to close. Ignore + } + } + + @Test + public void testInvalidMetricType() + { + metricBuilder + .setName("unsupported_histogram_metric") + .getExponentialHistogramBuilder() + .addDataPointsBuilder() + .setTimeUnixNano(TIMESTAMP); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + List rowList = new ArrayList<>(); + rows.forEachRemaining(rowList::add); + Assert.assertEquals(0, rowList.size()); + } + + @Test + public void testNoRecordedValueMetric() + { + metricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setFlags(DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) + .setTimeUnixNano(TIMESTAMP); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertFalse(rows.hasNext()); + } + + private void assertDimensionEquals(InputRow row, String dimension, Object expected) + { + List values = row.getDimension(dimension); + Assert.assertEquals(1, values.size()); + Assert.assertEquals(expected, values.get(0)); + } + +} diff --git a/pom.xml b/pom.xml index a44f0acfce0b..d6568f56d697 100644 --- a/pom.xml +++ b/pom.xml @@ -119,6 +119,7 @@ 0.8.12 6.2.5.Final 4.5.13 + 0.19.0-alpha 3.8.4 3.48.1 @@ -257,6 +258,7 @@ extensions-contrib/spectator-histogram extensions-contrib/rabbit-stream-indexing-service extensions-contrib/druid-ranger-security + extensions-contrib/opentelemetry-extensions distribution @@ -1237,6 +1239,11 @@ postgresql ${postgresql.version} + + io.opentelemetry.proto + opentelemetry-proto + ${opentelemetry.proto.version} +