Skip to content

Commit

Permalink
Add the experimental exemplar feature (#4871)
Browse files Browse the repository at this point in the history
* Add the experimental exemplar feature

* Add exemplars to EXPERIMENTAL.md

* Add changelog entry

* Fix hist buckets > 1 detection

* Collect instead of Flush res about to be deleted

* Add e2e test

* Do not pre-alloc ResourceMetrics

This only has a single use.

* Fix grammatical error in comment

* Add test cases

Default and invalid OTEL_METRICS_EXEMPLAR_FILTER.

Test sampled and non-sampled context for trace_based.

* Comment nCPU

* Doc OTEL_METRICS_EXEMPLAR_FILTER
  • Loading branch information
MrAlias authored Jan 31, 2024
1 parent d9d9507 commit fecb92e
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
- Experimental exemplar exporting is added to the metric SDK.
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)

### Fixed

Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/EXPERIMENTAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,67 @@ Disable the cardinality limit.
unset OTEL_GO_X_CARDINALITY_LIMIT
```

### Exemplars

A sample of measurements made may be exported directly as a set of exemplars.

This experimental feature can be enabled by setting the `OTEL_GO_X_EXEMPLAR` environment variable.
The value of must be the case-insensitive string of `"true"` to enable the feature.
All other values are ignored.

Exemplar filters are a supported.
The exemplar filter applies to all measurements made.
They filter these measurements, only allowing certain measurements to be passed to the underlying exemplar reservoir.

To change the exemplar filter from the default `"trace_based"` filter set the `OTEL_METRICS_EXEMPLAR_FILTER` environment variable.
The value must be the case-sensitive string defined by the [OpenTelemetry specification].

- `"always_on"`: allows all measurements
- `"always_off"`: denies all measurements
- `"trace_based"`: allows only sampled measurements

All values other than these will result in the default, `"trace_based"`, exemplar filter being used.

[OpenTelemetry specification]: https://github.com/open-telemetry/opentelemetry-specification/blob/a6ca2fd484c9e76fe1d8e1c79c99f08f4745b5ee/specification/configuration/sdk-environment-variables.md#exemplar

#### Examples

Enable exemplars to be exported.

```console
export OTEL_GO_X_EXEMPLAR=true
```

Disable exemplars from being exported.

```console
unset OTEL_GO_X_EXEMPLAR
```

Set the exemplar filter to allow all measurements.

```console
export OTEL_METRICS_EXEMPLAR_FILTER=always_on
```

Set the exemplar filter to deny all measurements.

```console
export OTEL_METRICS_EXEMPLAR_FILTER=always_off
```

Set the exemplar filter to only allow sampled measurements.

```console
export OTEL_METRICS_EXEMPLAR_FILTER=trace_based
```

Revert to the default exemplar filter (`"trace_based"`)

```console
unset OTEL_METRICS_EXEMPLAR_FILTER
```

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md).
Expand Down
89 changes: 89 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"runtime"
"strconv"
"testing"

Expand All @@ -24,6 +26,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace"
)

var viewBenchmarks = []struct {
Expand Down Expand Up @@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) {
b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...))))
}
}

func BenchmarkExemplars(b *testing.B) {
sc := trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{0o1},
TraceID: trace.TraceID{0o1},
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

attr := attribute.NewSet(
attribute.String("user", "Alice"),
attribute.Bool("admin", true),
)

setup := func(name string) (metric.Meter, Reader) {
r := NewManualReader()
v := NewView(Instrument{Name: "*"}, Stream{
AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("user")
},
})
mp := NewMeterProvider(WithReader(r), WithView(v))
return mp.Meter(name), r
}
nCPU := runtime.NumCPU() // Size of the fixed reservoir used.

b.Setenv("OTEL_GO_X_EXEMPLAR", "true")

name := fmt.Sprintf("Int64Counter/%d", nCPU)
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Counter("int64-counter")
assert.NoError(b, err)

rm := newRM(metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Add(ctx, 1, metric.WithAttributeSet(attr))
}

_ = r.Collect(ctx, rm)
assert.Len(b, *e, nCPU)
}
})

name = fmt.Sprintf("Int64Histogram/%d", nCPU)
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Histogram("int64-histogram")
assert.NoError(b, err)

rm := newRM(metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, 1)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Record(ctx, 1, metric.WithAttributeSet(attr))
}

_ = r.Collect(ctx, rm)
assert.Len(b, *e, 1)
}
})
}

func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{{Data: a}}},
},
}
}
96 changes: 96 additions & 0 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"
"runtime"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := make([]float64, len(a.Boundaries))
copy(cp, a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}
}

var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. numer of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
}
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

switch os.Getenv(filterEnvKey) {
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return exemplar.SampledFilter(newR())
}
}
}
37 changes: 28 additions & 9 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -44,6 +45,12 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -54,15 +61,27 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return f
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, nil)
}
}

// LastValue returns a last-value aggregate function input and output.
Expand All @@ -71,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit)
lv := newLastValue[N](b.AggregationLimit, b.resFunc())

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -87,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -98,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit)
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -110,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -122,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
Loading

0 comments on commit fecb92e

Please sign in to comment.