Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics aggregate collector generic over temporality #2506

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
fn as_mut(&mut self) -> &mut dyn any::Any;
}

/// Allow to access data points of an [Aggregation].
pub trait AggregationDataPoints {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub trait AggregationDataPoints {
pub(crate) trait AggregationDataPoints {

/// The type of data point in the aggregation.
type Point;
Copy link
Contributor

@utpilla utpilla Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's name it DataPoint.

Suggested change
type Point;
type DataPoint;

/// The data points of the aggregation.
fn points(&mut self) -> &mut Vec<Self::Point>;
}

/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct GaugeDataPoint<T> {
Expand Down Expand Up @@ -228,6 +236,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
}
}

impl<T> AggregationDataPoints for ExponentialHistogram<T> {
type Point = ExponentialHistogramDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::Point> {
&mut self.data_points
}
}

/// A single exponential histogram data point in a time series.
#[derive(Debug, PartialEq)]
pub struct ExponentialHistogramDataPoint<T> {
Expand Down
56 changes: 44 additions & 12 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ use std::{

use opentelemetry::KeyValue;

use crate::metrics::{data::Aggregation, Temporality};
use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
collector::{Collector, CumulativeValueMap, DeltaValueMap},
exponential_histogram::{ExpoHistogram, ExpoHistogramBucketConfig},
histogram::Histogram,
last_value::LastValue,
precomputed_sum::PrecomputedSum,
sum::Sum,
Number,
};

pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
Expand Down Expand Up @@ -58,6 +66,7 @@ where
}
}

#[derive(Clone, Copy)]
pub(crate) struct AggregateTime {
pub start: SystemTime,
pub current: SystemTime,
Expand Down Expand Up @@ -121,6 +130,12 @@ impl AttributeSetFilter {
}
}

pub(crate) trait InitAggregationData {
type Aggr: Aggregation + AggregationDataPoints;
fn create_new(&self, time: AggregateTime) -> Self::Aggr;
fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime);
}

/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
Expand Down Expand Up @@ -182,15 +197,32 @@ impl<T: Number> AggregateBuilder<T> {
record_min_max: bool,
record_sum: bool,
) -> AggregateFns<T> {
ExpoHistogram::new(
self.temporality,
self.filter.clone(),
max_size,
max_scale,
record_min_max,
record_sum,
)
.into()
match self.temporality {
Temporality::Delta => ExpoHistogram {
aggregate_collector: Collector::new(
self.filter.clone(),
DeltaValueMap::new(ExpoHistogramBucketConfig {
max_size: max_size as i32,
max_scale,
}),
),
record_min_max,
record_sum,
}
.into(),
_ => ExpoHistogram {
aggregate_collector: Collector::new(
self.filter.clone(),
CumulativeValueMap::new(ExpoHistogramBucketConfig {
max_size: max_size as i32,
max_scale,
}),
),
record_min_max,
record_sum,
}
.into(),
}
}
}

Expand Down
195 changes: 195 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
aggregate::{AggregateTime, AttributeSetFilter},
AggregateTimeInitiator, Aggregator, InitAggregationData, ValueMap,
};

/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
pub(crate) trait AggregateMap: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
}

/// Higher level abstraction (compared to [`AggregateMap`]) that also does the filtering and collection into aggregation data
pub(crate) trait AggregateCollector: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);
Copy link
Contributor

@utpilla utpilla Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's getting a bit too confusing with these new coupled traits. Could we separate the concerns here? Could we update Collector trait to only have collect related methods and AggregateMap trait to only have update related methods? You could then keep an impl of both the traits as fields of ExpoHistogram struct.


fn collect<InitAggregate, F>(
&self,
aggregate: &InitAggregate,
dest: Option<&mut dyn Aggregation>,
create_point: F,
) -> (usize, Option<Box<dyn Aggregation>>)
where
InitAggregate: InitAggregationData,
F: FnMut(
Vec<KeyValue>,
&Self::Aggr,
) -> <InitAggregate::Aggr as AggregationDataPoints>::Point;
}

pub(crate) struct Collector<AM> {
filter: AttributeSetFilter,
aggregate_map: AM,
time: AggregateTimeInitiator,
}

impl<AM> Collector<AM>
where
AM: AggregateMap,
{
pub(crate) fn new(filter: AttributeSetFilter, aggregate_map: AM) -> Self {
Self {
filter,
aggregate_map,
time: AggregateTimeInitiator::default(),
}
}

fn init_time(&self) -> AggregateTime {
if let Temporality::Delta = AM::TEMPORALITY {
self.time.delta()
} else {
self.time.cumulative()
}
}
}

impl<AM> AggregateCollector for Collector<AM>
where
AM: AggregateMap,
{
const TEMPORALITY: Temporality = AM::TEMPORALITY;

type Aggr = AM::Aggr;

fn measure(&self, value: <AM::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]) {
self.filter.apply(attributes, |filtered_attrs| {
self.aggregate_map.measure(value, filtered_attrs);
});
}

fn collect<InitAggregate, F>(
&self,
aggregate: &InitAggregate,
dest: Option<&mut dyn Aggregation>,
create_point: F,
) -> (usize, Option<Box<dyn Aggregation>>)
where
InitAggregate: InitAggregationData,
F: FnMut(Vec<KeyValue>, &AM::Aggr) -> <InitAggregate::Aggr as AggregationDataPoints>::Point,
{
let time = self.init_time();
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<InitAggregate::Aggr>());
let mut new_agg = if s_data.is_none() {
Some(aggregate.create_new(time))

Check warning on line 100 in opentelemetry-sdk/src/metrics/internal/collector.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/collector.rs#L100

Added line #L100 was not covered by tests
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
aggregate.reset_existing(s_data, time);
self.aggregate_map
.collect_data_points(s_data.points(), create_point);

(
s_data.points().len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
/// Later this could be improved to support only Delta temporality
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> DeltaValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for DeltaValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Delta;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
/// Later this could be improved to support only Cumulative temporality
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> CumulativeValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for CumulativeValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Cumulative;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0.collect_readonly(dest, map_fn);
}
}
Loading
Loading