Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WithExplicitBucketBoundaries option in the metric SDK #4605

Merged
merged 7 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4660)
- Add Summary, SummaryDataPoint, and QuantileValue to `go.opentelemetry.io/sdk/metric/metricdata`. (#4622)
- `go.opentelemetry.io/otel/bridge/opencensus.NewMetricProducer` now supports exemplars from OpenCensus. (#4585)
- Add support for `WithExplicitBucketBoundaries` in `go.opentelemetry.io/otel/sdk/metric`. (#4605)

### Deprecated

Expand Down
54 changes: 50 additions & 4 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
// distribution of int64 measurements during a computational operation.
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
Expand Down Expand Up @@ -188,9 +187,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
// distribution of float64 measurements during a computational operation.
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
Expand Down Expand Up @@ -456,12 +454,36 @@ func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]ag
return p.int64Resolver.Aggregators(inst)
}

func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) {
boundaries := cfg.ExplicitBucketBoundaries()
aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
if aggError != nil {
// If boundaries are invalid, ignore them.
boundaries = nil
}
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.int64Resolver.HistogramAggregators(inst, boundaries)
return measures, errors.Join(aggError, err)
}

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }

Expand All @@ -476,12 +498,36 @@ func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]
return p.float64Resolver.Aggregators(inst)
}

func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) {
boundaries := cfg.ExplicitBucketBoundaries()
aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
if aggError != nil {
// If boundaries are invalid, ignore them.
boundaries = nil
}
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.float64Resolver.HistogramAggregators(inst, boundaries)
return measures, errors.Join(aggError, err)
}

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
Expand Down
83 changes: 83 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -550,6 +551,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) {

wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName),
},
{
name: "Int64Histogram with invalid buckets",

fn: func(t *testing.T, m metric.Meter) error {
i, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5))
assert.NotNil(t, i)
return err
},

wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})),
},
{
name: "Int64ObservableCounter with no validation issues",

Expand Down Expand Up @@ -670,6 +682,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) {

wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName),
},
{
name: "Float64Histogram with invalid buckets",

fn: func(t *testing.T, m metric.Meter) error {
i, err := m.Float64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5))
assert.NotNil(t, i)
return err
},

wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})),
},
{
name: "Float64ObservableCounter with no validation issues",

Expand Down Expand Up @@ -1970,3 +1993,63 @@ func TestMalformedSelectors(t *testing.T) {
})
}
}

func TestHistogramBucketPrecedenceOrdering(t *testing.T) {
defaultBuckets := []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}
aggregationSelector := func(InstrumentKind) Aggregation {
return AggregationExplicitBucketHistogram{Boundaries: []float64{0, 1, 2, 3, 4, 5}}
}
for _, tt := range []struct {
desc string
reader Reader
views []View
histogramOpts []metric.Float64HistogramOption
expectedBucketBoundaries []float64
}{
{
desc: "default",
reader: NewManualReader(),
expectedBucketBoundaries: defaultBuckets,
},
{
desc: "custom reader aggregation overrides default",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
expectedBucketBoundaries: []float64{0, 1, 2, 3, 4, 5},
},
{
desc: "overridden by histogram option",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
histogramOpts: []metric.Float64HistogramOption{
metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10),
},
expectedBucketBoundaries: []float64{0, 2, 4, 6, 8, 10},
},
{
desc: "overridden by view",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
histogramOpts: []metric.Float64HistogramOption{
metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10),
},
views: []View{NewView(Instrument{Name: "*"}, Stream{
Aggregation: AggregationExplicitBucketHistogram{Boundaries: []float64{0, 3, 6, 9, 12, 15}},
})},
expectedBucketBoundaries: []float64{0, 3, 6, 9, 12, 15},
},
} {
t.Run(tt.desc, func(t *testing.T) {
meter := NewMeterProvider(WithView(tt.views...), WithReader(tt.reader)).Meter("TestHistogramBucketPrecedenceOrdering")
sfHistogram, err := meter.Float64Histogram("sync.float64.histogram", tt.histogramOpts...)
require.NoError(t, err)
sfHistogram.Record(context.Background(), 1)
var rm metricdata.ResourceMetrics
err = tt.reader.Collect(context.Background(), &rm)
require.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
gotHist, ok := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64])
require.True(t, ok)
require.Len(t, gotHist.DataPoints, 1)
assert.Equal(t, tt.expectedBucketBoundaries, gotHist.DataPoints[0].Bounds)
})
}
}
81 changes: 56 additions & 25 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) {
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) {
var (
matched bool
measures []aggregate.Measure[N]
Expand All @@ -245,8 +245,7 @@
continue
}
matched = true

in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
Expand All @@ -271,7 +270,7 @@
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
Expand All @@ -291,6 +290,31 @@
Err error
}

// readerDefaultAggregation returns the default aggregation for the instrument
// kind based on the reader's aggregation preferences. This is used unless the
// aggregation is overridden with a view.
func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation {
aggregation := i.pipeline.reader.aggregation(kind)
switch aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
aggregation = aggregation.copy()
if err := aggregation.err(); err != nil {
orig := aggregation
aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", aggregation,
)
}

Check warning on line 313 in sdk/metric/pipeline.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/pipeline.go#L306-L313

Added lines #L306 - L313 were not covered by tests
}
return aggregation
}

// cachedAggregator returns the appropriate aggregate input and output
// functions for an instrument configuration. If the exact instrument has been
// created within the inst.Scope, those aggregate function instances will be
Expand All @@ -305,29 +329,14 @@
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
switch stream.Aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
stream.Aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
stream.Aggregation = stream.Aggregation.copy()
if err := stream.Aggregation.err(); err != nil {
orig := stream.Aggregation
stream.Aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", stream.Aggregation,
)
}
}
// The aggregation was not overridden with a view. Use the aggregation
// provided by the reader.
stream.Aggregation = readerAggregation
case AggregationDefault:
// The view explicitly requested the default aggregation.

Check warning on line 339 in sdk/metric/pipeline.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/pipeline.go#L339

Added line #L339 was not covered by tests
stream.Aggregation = DefaultAggregationSelector(kind)
}

Expand Down Expand Up @@ -596,7 +605,29 @@

errs := &multierror{}
for _, i := range r.inserters {
in, err := i.Instrument(id)
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}

// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
// over boundaries provided by the reader.
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
}
Expand Down
Loading