From d98d1c3fe8059343a5b342d1fd7360a7307e12a7 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 27 Jun 2024 20:25:22 +0000 Subject: [PATCH] add filtered reservoir to eliminate unneccessary time.Now calls --- CHANGELOG.md | 1 + sdk/metric/exemplar.go | 92 +++++++++---------- sdk/metric/internal/aggregate/aggregate.go | 4 +- .../internal/aggregate/aggregate_test.go | 4 +- .../aggregate/exponential_histogram.go | 10 +- .../aggregate/exponential_histogram_test.go | 20 ++-- sdk/metric/internal/aggregate/histogram.go | 12 +-- .../internal/aggregate/histogram_test.go | 32 +++---- sdk/metric/internal/aggregate/lastvalue.go | 12 +-- .../internal/aggregate/lastvalue_test.go | 80 ++++++++-------- sdk/metric/internal/aggregate/sum.go | 14 ++- sdk/metric/internal/aggregate/sum_test.go | 76 +++++++-------- sdk/metric/internal/exemplar/drop.go | 11 +-- sdk/metric/internal/exemplar/drop_test.go | 18 ++-- sdk/metric/internal/exemplar/filter.go | 28 +++--- sdk/metric/internal/exemplar/filter_test.go | 29 ++---- .../internal/exemplar/filtered_reservoir.go | 48 ++++++++++ sdk/metric/pipeline.go | 2 +- 18 files changed, 262 insertions(+), 231 deletions(-) create mode 100644 sdk/metric/internal/exemplar/filtered_reservoir.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d287e9f25c2..f5e7cd012f29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Upgrade `go.opentelemetry.io/otel/semconv/v1.25.0` to `go.opentelemetry.io/otel/semconv/v1.26.0` in `go.opentelemetry.io/otel/sdk/resource`. (#5490) - Upgrade `go.opentelemetry.io/otel/semconv/v1.25.0` to `go.opentelemetry.io/otel/semconv/v1.26.0` in `go.opentelemetry.io/otel/sdk/trace`. (#5490) - Use non-generic functions in the `Start` method of `"go.opentelemetry.io/otel/sdk/trace".Trace` to reduce memory allocation. (#5497) +- Improve performance of metric instruments in `go.opentelemetry.io/otel/sdk/metric` by removing unnecessary calls to time.Now. (#5545) ### Fixed diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index c774a4684f2d..82619da78ec1 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -19,67 +19,63 @@ import ( // Note: This will only return non-nil values when the experimental exemplar // feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable // is not set to always_off. -func reservoirFunc(agg Aggregation) func() exemplar.Reservoir { +func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] { if !x.Exemplars.Enabled() { return nil } - - // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults - resF := func() func() exemplar.Reservoir { - // Explicit bucket histogram aggregation with more than 1 bucket will - // use AlignedHistogramBucketExemplarReservoir. - a, ok := agg.(AggregationExplicitBucketHistogram) - if ok && len(a.Boundaries) > 0 { - cp := slices.Clone(a.Boundaries) - return func() exemplar.Reservoir { - bounds := cp - return exemplar.Histogram(bounds) - } - } - - var n int - if a, ok := agg.(AggregationBase2ExponentialHistogram); ok { - // Base2 Exponential Histogram Aggregation SHOULD use a - // SimpleFixedSizeExemplarReservoir with a reservoir equal to the - // smaller of the maximum number of buckets configured on the - // aggregation or twenty (e.g. min(20, max_buckets)). - n = int(a.MaxSize) - if n > 20 { - n = 20 - } - } else { - // https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir - // This Exemplar reservoir MAY take a configuration parameter for - // the size of the reservoir. If no size configuration is - // provided, the default size MAY be the number of possible - // concurrent threads (e.g. number of CPUs) to help reduce - // contention. Otherwise, a default size of 1 SHOULD be used. - n = runtime.NumCPU() - if n < 1 { - // Should never be the case, but be defensive. - n = 1 - } - } - - return func() exemplar.Reservoir { - return exemplar.FixedSize(n) - } - } - // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" + var filter exemplar.Filter + switch os.Getenv(filterEnvKey) { case "always_on": - return resF() + filter = exemplar.AlwaysOnFilter case "always_off": return exemplar.Drop case "trace_based": fallthrough default: - newR := resF() - return func() exemplar.Reservoir { - return exemplar.SampledFilter(newR()) + filter = exemplar.SampledFilter + } + + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults + // Explicit bucket histogram aggregation with more than 1 bucket will + // use AlignedHistogramBucketExemplarReservoir. + a, ok := agg.(AggregationExplicitBucketHistogram) + if ok && len(a.Boundaries) > 0 { + cp := slices.Clone(a.Boundaries) + return func() exemplar.FilteredReservoir[N] { + bounds := cp + return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds)) } } + + var n int + if a, ok := agg.(AggregationBase2ExponentialHistogram); ok { + // Base2 Exponential Histogram Aggregation SHOULD use a + // SimpleFixedSizeExemplarReservoir with a reservoir equal to the + // smaller of the maximum number of buckets configured on the + // aggregation or twenty (e.g. min(20, max_buckets)). + n = int(a.MaxSize) + if n > 20 { + n = 20 + } + } else { + // https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir + // This Exemplar reservoir MAY take a configuration parameter for + // the size of the reservoir. If no size configuration is + // provided, the default size MAY be the number of possible + // concurrent threads (e.g. number of CPUs) to help reduce + // contention. Otherwise, a default size of 1 SHOULD be used. + n = runtime.NumCPU() + if n < 1 { + // Should never be the case, but be defensive. + n = 1 + } + } + + return func() exemplar.FilteredReservoir[N] { + return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n)) + } } diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index c9976de6c785..b18ee719bd19 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct { // // If this is not provided a default factory function that returns an // exemplar.Drop reservoir will be used. - ReservoirFunc func() exemplar.Reservoir + ReservoirFunc func() exemplar.FilteredReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -50,7 +50,7 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func() exemplar.Reservoir { +func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] { if b.ReservoirFunc != nil { return b.ReservoirFunc } diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 37c310a60e3f..df795022621a 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -73,8 +73,8 @@ func (c *clock) Register() (unregister func()) { return func() { now = orig } } -func dropExemplars[N int64 | float64]() exemplar.Reservoir { - return exemplar.Drop() +func dropExemplars[N int64 | float64]() exemplar.FilteredReservoir[N] { + return exemplar.Drop[N]() } func TestBuilderFilter(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 902074b5bfdc..c9c7e8f62a90 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -31,7 +31,7 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { attrs attribute.Set - res exemplar.Reservoir + res exemplar.FilteredReservoir[N] count uint64 min N @@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, @@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int - newRes func() exemplar.Reservoir + newRes func() exemplar.FilteredReservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -319,8 +319,6 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib return } - t := now() - e.valuesMu.Lock() defer e.valuesMu.Unlock() @@ -333,7 +331,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib e.values[attr.Equivalent()] = v } v.record(value) - v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) + v.res.Offer(ctx, value, droppedAttr) } func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 2ffd3ebf0bfc..8af8589d3a63 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -778,7 +778,7 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(9), + Time: y2kPlus(2), Count: 7, Min: metricdata.NewExtrema[N](-1), Max: metricdata.NewExtrema[N](16), @@ -832,8 +832,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(10), - Time: y2kPlus(24), + StartTime: y2kPlus(3), + Time: y2kPlus(4), Count: 7, Min: metricdata.NewExtrema[N](-1), Max: metricdata.NewExtrema[N](16), @@ -850,8 +850,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { }, { Attributes: overflowSet, - StartTime: y2kPlus(10), - Time: y2kPlus(24), + StartTime: y2kPlus(3), + Time: y2kPlus(4), Count: 6, Min: metricdata.NewExtrema[N](1), Max: metricdata.NewExtrema[N](16), @@ -905,7 +905,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(9), + Time: y2kPlus(2), Count: 7, Min: metricdata.NewExtrema[N](-1), Max: metricdata.NewExtrema[N](16), @@ -938,7 +938,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(13), + Time: y2kPlus(3), Count: 10, Min: metricdata.NewExtrema[N](-1), Max: metricdata.NewExtrema[N](16), @@ -967,7 +967,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(14), + Time: y2kPlus(4), Count: 10, Min: metricdata.NewExtrema[N](-1), Max: metricdata.NewExtrema[N](16), @@ -1004,7 +1004,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(21), + Time: y2kPlus(5), Count: 10, Min: metricdata.NewExtrema[N](-1), Max: metricdata.NewExtrema[N](16), @@ -1022,7 +1022,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(21), + Time: y2kPlus(5), Count: 6, Min: metricdata.NewExtrema[N](1), Max: metricdata.NewExtrema[N](16), diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 213baf50f53a..ade0941f5f5d 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -17,7 +17,7 @@ import ( type buckets[N int64 | float64] struct { attrs attribute.Set - res exemplar.Reservoir + res exemplar.FilteredReservoir[N] counts []uint64 count uint64 @@ -48,13 +48,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func() exemplar.Reservoir + newRes func() exemplar.FilteredReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -80,8 +80,6 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) - t := now() - s.valuesMu.Lock() defer s.valuesMu.Unlock() @@ -106,12 +104,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute if !s.noSum { b.sum(value) } - b.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) + b.res.Offer(ctx, value, droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 38ba1229eb29..cc9772da0d92 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -80,8 +80,8 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 2, 3, y2kPlus(1), y2kPlus(7)), - c.hPt(fltrBob, 10, 2, y2kPlus(1), y2kPlus(7)), + c.hPt(fltrAlice, 2, 3, y2kPlus(1), y2kPlus(2)), + c.hPt(fltrBob, 10, 2, y2kPlus(1), y2kPlus(2)), }, }, }, @@ -96,8 +96,8 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 10, 1, y2kPlus(7), y2kPlus(10)), - c.hPt(fltrBob, 3, 1, y2kPlus(7), y2kPlus(10)), + c.hPt(fltrAlice, 10, 1, y2kPlus(2), y2kPlus(3)), + c.hPt(fltrBob, 3, 1, y2kPlus(2), y2kPlus(3)), }, }, }, @@ -126,9 +126,9 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 1, 1, y2kPlus(11), y2kPlus(16)), - c.hPt(fltrBob, 1, 1, y2kPlus(11), y2kPlus(16)), - c.hPt(overflowSet, 1, 2, y2kPlus(11), y2kPlus(16)), + c.hPt(fltrAlice, 1, 1, y2kPlus(4), y2kPlus(5)), + c.hPt(fltrBob, 1, 1, y2kPlus(4), y2kPlus(5)), + c.hPt(overflowSet, 1, 2, y2kPlus(4), y2kPlus(5)), }, }, }, @@ -167,8 +167,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 2, 3, y2kPlus(0), y2kPlus(7)), - c.hPt(fltrBob, 10, 2, y2kPlus(0), y2kPlus(7)), + c.hPt(fltrAlice, 2, 3, y2kPlus(0), y2kPlus(2)), + c.hPt(fltrBob, 10, 2, y2kPlus(0), y2kPlus(2)), }, }, }, @@ -183,8 +183,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(10)), - c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(10)), + c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(3)), + c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(3)), }, }, }, @@ -196,8 +196,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(11)), - c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(11)), + c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(4)), + c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(4)), }, }, }, @@ -213,9 +213,9 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { agg: metricdata.Histogram[N]{ Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[N]{ - c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(14)), - c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(14)), - c.hPt(overflowSet, 1, 2, y2kPlus(0), y2kPlus(14)), + c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(5)), + c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(5)), + c.hPt(overflowSet, 1, 2, y2kPlus(0), y2kPlus(5)), }, }, }, diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 3b65e761e862..c359368403e5 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -17,10 +17,10 @@ import ( type datapoint[N int64 | float64] struct { attrs attribute.Set value N - res exemplar.Reservoir + res exemplar.FilteredReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), @@ -33,15 +33,13 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la type lastValue[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir + newRes func() exemplar.FilteredReservoir[N] limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] start time.Time } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - t := now() - s.Lock() defer s.Unlock() @@ -53,7 +51,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. d.attrs = attr d.value = value - d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) + d.res.Offer(ctx, value, droppedAttr) s.values[attr.Equivalent()] = d } @@ -117,7 +115,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in // newPrecomputedLastValue returns an aggregator that summarizes a set of // observations as the last one made. -func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] { +func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] { return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 1e4ca21c96ad..77e0d283ba0f 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -61,13 +61,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -10, }, }, @@ -88,14 +88,14 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(8), - Time: y2kPlus(11), + StartTime: y2kPlus(3), + Time: y2kPlus(4), Value: 10, }, { Attributes: fltrBob, - StartTime: y2kPlus(8), - Time: y2kPlus(11), + StartTime: y2kPlus(3), + Time: y2kPlus(4), Value: 3, }, }, @@ -115,20 +115,20 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, }, @@ -165,13 +165,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -10, }, }, @@ -187,13 +187,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(8), + Time: y2kPlus(3), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(8), + Time: y2kPlus(3), Value: -10, }, }, @@ -211,13 +211,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(4), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(4), Value: 3, }, }, @@ -238,19 +238,19 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(16), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(16), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(16), + Time: y2kPlus(5), Value: 1, }, }, @@ -287,13 +287,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -10, }, }, @@ -314,14 +314,14 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(8), - Time: y2kPlus(11), + StartTime: y2kPlus(3), + Time: y2kPlus(4), Value: 10, }, { Attributes: fltrBob, - StartTime: y2kPlus(8), - Time: y2kPlus(11), + StartTime: y2kPlus(3), + Time: y2kPlus(4), Value: 3, }, }, @@ -341,20 +341,20 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, }, @@ -391,13 +391,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -10, }, }, @@ -419,13 +419,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(4), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(4), Value: 3, }, }, @@ -446,19 +446,19 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(16), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(16), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(16), + Time: y2kPlus(5), Value: 1, }, }, diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index babe76aba9b7..891366922600 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -15,19 +15,19 @@ import ( type sumValue[N int64 | float64] struct { n N - res exemplar.Reservoir + res exemplar.FilteredReservoir[N] attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir + newRes func() exemplar.FilteredReservoir[N] limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), @@ -36,8 +36,6 @@ func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir) *val } func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - t := now() - s.Lock() defer s.Unlock() @@ -49,7 +47,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S v.attrs = attr v.n += value - v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) + v.res.Offer(ctx, value, droppedAttr) s.values[attr.Equivalent()] = v } @@ -57,7 +55,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -146,7 +144,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index c20adaed500b..bb825e183757 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -75,13 +75,13 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -11, }, }, @@ -101,14 +101,14 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(7), - Time: y2kPlus(10), + StartTime: y2kPlus(2), + Time: y2kPlus(3), Value: 10, }, { Attributes: fltrBob, - StartTime: y2kPlus(7), - Time: y2kPlus(10), + StartTime: y2kPlus(2), + Time: y2kPlus(3), Value: 3, }, }, @@ -143,20 +143,20 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, - StartTime: y2kPlus(11), - Time: y2kPlus(16), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 2, }, }, @@ -203,13 +203,13 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -11, }, }, @@ -230,13 +230,13 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(10), + Time: y2kPlus(3), Value: 14, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(10), + Time: y2kPlus(3), Value: -8, }, }, @@ -258,19 +258,19 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(13), + Time: y2kPlus(4), Value: 14, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(13), + Time: y2kPlus(4), Value: -8, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(13), + Time: y2kPlus(4), Value: 2, }, }, @@ -317,13 +317,13 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -11, }, }, @@ -344,14 +344,14 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(7), - Time: y2kPlus(11), + StartTime: y2kPlus(2), + Time: y2kPlus(3), Value: 7, }, { Attributes: fltrBob, - StartTime: y2kPlus(7), - Time: y2kPlus(11), + StartTime: y2kPlus(2), + Time: y2kPlus(3), Value: 14, }, }, @@ -386,20 +386,20 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - StartTime: y2kPlus(12), - Time: y2kPlus(17), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, - StartTime: y2kPlus(12), - Time: y2kPlus(17), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, - StartTime: y2kPlus(12), - Time: y2kPlus(17), + StartTime: y2kPlus(4), + Time: y2kPlus(5), Value: 2, }, }, @@ -446,13 +446,13 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(2), Value: -11, }, }, @@ -474,13 +474,13 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(3), Value: 11, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(3), Value: 3, }, }, @@ -516,19 +516,19 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(17), + Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(17), + Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(17), + Time: y2kPlus(5), Value: 2, }, }, diff --git a/sdk/metric/internal/exemplar/drop.go b/sdk/metric/internal/exemplar/drop.go index bf21e45dfaf2..5a0f39ae1478 100644 --- a/sdk/metric/internal/exemplar/drop.go +++ b/sdk/metric/internal/exemplar/drop.go @@ -5,20 +5,19 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exempla import ( "context" - "time" "go.opentelemetry.io/otel/attribute" ) -// Drop returns a [Reservoir] that drops all measurements it is offered. -func Drop() Reservoir { return &dropRes{} } +// Drop returns a [FilteredReservoir] that drops all measurements it is offered. +func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} } -type dropRes struct{} +type dropRes[N int64 | float64] struct{} // Offer does nothing, all measurements offered will be dropped. -func (r *dropRes) Offer(context.Context, time.Time, Value, []attribute.KeyValue) {} +func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {} // Collect resets dest. No exemplars will ever be returned. -func (r *dropRes) Collect(dest *[]Exemplar) { +func (r *dropRes[N]) Collect(dest *[]Exemplar) { *dest = (*dest)[:0] } diff --git a/sdk/metric/internal/exemplar/drop_test.go b/sdk/metric/internal/exemplar/drop_test.go index 9140f9e276ee..578b28acd27a 100644 --- a/sdk/metric/internal/exemplar/drop_test.go +++ b/sdk/metric/internal/exemplar/drop_test.go @@ -5,14 +5,20 @@ package exemplar import ( "testing" + + "github.com/stretchr/testify/assert" ) func TestDrop(t *testing.T) { - t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) { - return Drop(), 0 - })) + t.Run("Int64", testDropFiltered[int64]) + t.Run("Float64", testDropFiltered[float64]) +} + +func testDropFiltered[N int64 | float64](t *testing.T) { + r := Drop[N]() + + var dest []Exemplar + r.Collect(&dest) - t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) { - return Drop(), 0 - })) + assert.Len(t, dest, 0, "non-sampled context should not be offered") } diff --git a/sdk/metric/internal/exemplar/filter.go b/sdk/metric/internal/exemplar/filter.go index d96aacc281aa..152a069a09e9 100644 --- a/sdk/metric/internal/exemplar/filter.go +++ b/sdk/metric/internal/exemplar/filter.go @@ -5,25 +5,25 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exempla import ( "context" - "time" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -// SampledFilter returns a [Reservoir] wrapping r that will only offer measurements -// to r if the passed context associated with the measurement contains a sampled -// [go.opentelemetry.io/otel/trace.SpanContext]. -func SampledFilter(r Reservoir) Reservoir { - return filtered{Reservoir: r} -} +// Filter determines if a measurement should be offered. +// +// The passed ctx needs to contain any baggage or span that were active +// when the measurement was made. This information may be used by the +// Reservoir in making a sampling decision. +type Filter func(context.Context) bool -type filtered struct { - Reservoir +// SampledFilter is a [Filter] that will only offer measurements +// if the passed context associated with the measurement contains a sampled +// [go.opentelemetry.io/otel/trace.SpanContext]. +func SampledFilter(ctx context.Context) bool { + return trace.SpanContextFromContext(ctx).IsSampled() } -func (f filtered) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { - if trace.SpanContextFromContext(ctx).IsSampled() { - f.Reservoir.Offer(ctx, t, n, a) - } +// AlwaysOnFilter is a [Filter] that always offers measurements. +func AlwaysOnFilter(ctx context.Context) bool { + return true } diff --git a/sdk/metric/internal/exemplar/filter_test.go b/sdk/metric/internal/exemplar/filter_test.go index eadcc667a8b1..d6827d5b2f93 100644 --- a/sdk/metric/internal/exemplar/filter_test.go +++ b/sdk/metric/internal/exemplar/filter_test.go @@ -6,11 +6,9 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exempla import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -20,18 +18,10 @@ func TestSampledFilter(t *testing.T) { } func testSampledFiltered[N int64 | float64](t *testing.T) { - under := &res{} - - r := SampledFilter(under) - ctx := context.Background() - r.Offer(ctx, staticTime, NewValue(N(0)), nil) - assert.False(t, under.OfferCalled, "underlying Reservoir Offer called") - r.Offer(sample(ctx), staticTime, NewValue(N(0)), nil) - assert.True(t, under.OfferCalled, "underlying Reservoir Offer not called") - r.Collect(nil) - assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called") + assert.False(t, SampledFilter(ctx), "non-sampled context should not be offered") + assert.True(t, SampledFilter(sample(ctx)), "sampled context should be offered") } func sample(parent context.Context) context.Context { @@ -43,15 +33,14 @@ func sample(parent context.Context) context.Context { return trace.ContextWithSpanContext(parent, sc) } -type res struct { - OfferCalled bool - CollectCalled bool +func TestAlwaysOnFilter(t *testing.T) { + t.Run("Int64", testAlwaysOnFiltered[int64]) + t.Run("Float64", testAlwaysOnFiltered[float64]) } -func (r *res) Offer(context.Context, time.Time, Value, []attribute.KeyValue) { - r.OfferCalled = true -} +func testAlwaysOnFiltered[N int64 | float64](t *testing.T) { + ctx := context.Background() -func (r *res) Collect(*[]Exemplar) { - r.CollectCalled = true + assert.True(t, AlwaysOnFilter(ctx), "non-sampled context should not be offered") + assert.True(t, AlwaysOnFilter(sample(ctx)), "sampled context should be offered") } diff --git a/sdk/metric/internal/exemplar/filtered_reservoir.go b/sdk/metric/internal/exemplar/filtered_reservoir.go new file mode 100644 index 000000000000..c80014e119f4 --- /dev/null +++ b/sdk/metric/internal/exemplar/filtered_reservoir.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// FilteredReservoir is wraps a [Reservoir] with a filter. +type FilteredReservoir[N int64 | float64] interface { + // Offer accepts the parameters associated with a measurement. The + // parameters will be stored as an exemplar if the filter decides to + // sample the measurement. + // + // The passed ctx needs to contain any baggage or span that were active + // when the measurement was made. This information may be used by the + // Reservoir in making a sampling decision. + Offer(ctx context.Context, val N, attr []attribute.KeyValue) + // Collect returns all the held exemplars in the reservoir. + Collect(dest *[]Exemplar) +} + +// filteredReservoir handles the pre-sampled exemplar of measurements made. +type filteredReservoir[N int64 | float64] struct { + filter Filter + reservoir Reservoir +} + +// NewFilteredReservoir creates a [FilteredReservoir] which only offers values +// that are allowed by the filter. +func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] { + return &filteredReservoir[N]{ + filter: f, + reservoir: r, + } +} +func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { + if f.filter(ctx) { + // only record the current time if we are sampling this measurment. + f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr) + } +} + +func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index c6f9597198c1..823bf2fe3d27 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -349,7 +349,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc(stream.Aggregation), + ReservoirFunc: reservoirFunc[N](stream.Aggregation), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation