Skip to content

Commit

Permalink
Native histogram performance (#4244)
Browse files Browse the repository at this point in the history
* Include native histograms in benchmark

* Fix tests

* Reduce calls to Labels()

* Clean up todos

* Reduce function signature on classicHistograms

* Add additional TODOs
  • Loading branch information
zalegrala authored Nov 1, 2024
1 parent d035888 commit ffeaac7
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 56 deletions.
2 changes: 1 addition & 1 deletion modules/generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func BenchmarkCollect(b *testing.B) {
spanMetricsDimensions: []string{"k8s.cluster.name", "k8s.namespace.name"},
spanMetricsEnableTargetInfo: true,
spanMetricsTargetInfoExcludedDimensions: []string{"excluded}"},
// nativeHistograms: overrides.HistogramMethodBoth,
nativeHistograms: overrides.HistogramMethodBoth,
}
)

Expand Down
3 changes: 2 additions & 1 deletion modules/generator/registry/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
// add metric name
baseLabels = append(baseLabels, labels.Label{Name: labels.MetricName, Value: c.metricName})

// TODO: avoid allocation on each collection
lb := labels.NewBuilder(baseLabels)

for _, s := range c.series {
Expand Down Expand Up @@ -166,7 +167,7 @@ func (c *counter) collectMetrics(appender storage.Appender, timeMs int64, extern
return
}

// TODO support exemplars
// TODO: support exemplars
}

return
Expand Down
3 changes: 2 additions & 1 deletion modules/generator/registry/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, external
baseLabels = append(baseLabels, labels.Label{Name: name, Value: value})
}

// TODO: avoid allocation on each collection
lb := labels.NewBuilder(baseLabels)

for _, s := range g.series {
Expand All @@ -166,7 +167,7 @@ func (g *gauge) collectMetrics(appender storage.Appender, timeMs int64, external
if err != nil {
return
}
// TODO support exemplars
// TODO: support exemplars
}

return
Expand Down
122 changes: 72 additions & 50 deletions modules/generator/registry/native_histogram.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package registry

import (
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -39,11 +40,19 @@ type nativeHistogram struct {
// generate. A diff in the configured value on the processors will cause a
// reload of the process, and a new instance of the histogram to be created.
histogramOverride HistogramMode

externalLabels map[string]string

// classic
nameCount string
nameSum string
nameBucket string
}

type nativeHistogramSeries struct {
// labels should not be modified after creation
labels LabelPair
lb *labels.Builder
labels labels.Labels
promHistogram prometheus.Histogram
lastUpdated int64
histogram *dto.Histogram
Expand All @@ -53,6 +62,11 @@ type nativeHistogramSeries struct {
// This avoids Prometheus throwing away the first value in the series,
// due to the transition from null -> x.
firstSeries *atomic.Bool

// classic
countLabels labels.Labels
sumLabels labels.Labels
// bucketLabels []labels.Labels
}

func (hs *nativeHistogramSeries) isNew() bool {
Expand All @@ -68,7 +82,7 @@ var (
_ metric = (*nativeHistogram)(nil)
)

func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string, histogramOverride HistogramMode) *nativeHistogram {
func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32) bool, onRemoveSeries func(count uint32), traceIDLabelName string, histogramOverride HistogramMode, externalLabels map[string]string) *nativeHistogram {
if onAddSeries == nil {
onAddSeries = func(uint32) bool {
return true
Expand All @@ -90,6 +104,12 @@ func newNativeHistogram(name string, buckets []float64, onAddSeries func(uint32)
traceIDLabelName: traceIDLabelName,
buckets: buckets,
histogramOverride: histogramOverride,
externalLabels: externalLabels,

// classic
nameCount: fmt.Sprintf("%s_count", name),
nameSum: fmt.Sprintf("%s_sum", name),
nameBucket: fmt.Sprintf("%s_bucket", name),
}
}

Expand All @@ -109,19 +129,15 @@ func (h *nativeHistogram) ObserveWithExemplar(labelValueCombo *LabelValueCombo,
return
}

newSeries := h.newSeries(labelValueCombo, value, traceID, multiplier)
h.series[hash] = newSeries
h.series[hash] = h.newSeries(labelValueCombo, value, traceID, multiplier)
}

func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value float64, traceID string, multiplier float64) *nativeHistogramSeries {
newSeries := &nativeHistogramSeries{
// TODO move these labels in HistogramOpts.ConstLabels?
labels: labelValueCombo.getLabelPair(),
promHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: h.name(),
Help: "Native histogram for metric " + h.name(),
Buckets: h.buckets,
// TODO check if these values are sensible and break them out
Name: h.name(),
Help: "Native histogram for metric " + h.name(),
Buckets: h.buckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 15 * time.Minute,
Expand All @@ -132,6 +148,31 @@ func (h *nativeHistogram) newSeries(labelValueCombo *LabelValueCombo, value floa

h.updateSeries(newSeries, value, traceID, multiplier)

lbls := labelValueCombo.getLabelPair()
lb := labels.NewBuilder(make(labels.Labels, 1+len(lbls.names)))

// set series labels
for i, name := range lbls.names {
lb.Set(name, lbls.values[i])
}
// set external labels
for name, value := range h.externalLabels {
lb.Set(name, value)
}

lb.Set(labels.MetricName, h.metricName)

newSeries.labels = lb.Labels()
newSeries.lb = lb

// _count
lb.Set(labels.MetricName, h.nameCount)
newSeries.countLabels = lb.Labels()

// _sum
lb.Set(labels.MetricName, h.nameSum)
newSeries.sumLabels = lb.Labels()

return newSeries
}

Expand All @@ -149,28 +190,13 @@ func (h *nativeHistogram) name() string {
return h.metricName
}

func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64, externalLabels map[string]string) (activeSeries int, err error) {
func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64, _ map[string]string) (activeSeries int, err error) {
h.seriesMtx.Lock()
defer h.seriesMtx.Unlock()

lbls := make(labels.Labels, 1+len(externalLabels))
lb := labels.NewBuilder(lbls)
activeSeries = 0

lb.Set(labels.MetricName, h.metricName)

// set external labels
for name, value := range externalLabels {
lb.Set(name, value)
}

for _, s := range h.series {

// Set series-specific labels
for i, name := range s.labels.names {
lb.Set(name, s.labels.values[i])
}

// Extract histogram
encodedMetric := &dto.Metric{}

Expand All @@ -180,14 +206,15 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64
return activeSeries, err
}

// NOTE: Store the encoded histogram here so we can keep track of the exemplars
// that have been sent. The value is updated here, but the pointers remain
// the same, and so Reset() call below can be used to clear the exemplars.
// NOTE: Store the encoded histogram here so we can keep track of the
// exemplars that have been sent. The value is updated here, but the
// pointers remain the same, and so Reset() call below can be used to clear
// the exemplars.
s.histogram = encodedMetric.GetHistogram()

// If we are in "both" or "classic" mode, also emit classic histograms.
if hasClassicHistograms(h.histogramOverride) {
classicSeries, classicErr := h.classicHistograms(appender, lb, timeMs, s)
classicSeries, classicErr := h.classicHistograms(appender, timeMs, s)
if classicErr != nil {
return activeSeries, classicErr
}
Expand All @@ -196,7 +223,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64

// If we are in "both" or "native" mode, also emit native histograms.
if hasNativeHistograms(h.histogramOverride) {
nativeErr := h.nativeHistograms(appender, lb, timeMs, s)
nativeErr := h.nativeHistograms(appender, s.labels, timeMs, s)
if nativeErr != nil {
return activeSeries, nativeErr
}
Expand All @@ -215,7 +242,7 @@ func (h *nativeHistogram) collectMetrics(appender storage.Appender, timeMs int64
Ts: convertTimestampToMs(encodedExemplar.GetTimestamp()),
HasTs: true,
}
_, err = appender.AppendExemplar(0, lb.Labels(), e)
_, err = appender.AppendExemplar(0, s.labels, e)
if err != nil {
return activeSeries, err
}
Expand Down Expand Up @@ -243,7 +270,7 @@ func (h *nativeHistogram) activeSeriesPerHistogramSerie() uint32 {
return 1
}

func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (err error) {
func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lbls labels.Labels, timeMs int64, s *nativeHistogramSeries) (err error) {
// Decode to Prometheus representation
hist := promhistogram.Histogram{
Schema: s.histogram.GetSchema(),
Expand Down Expand Up @@ -273,66 +300,61 @@ func (h *nativeHistogram) nativeHistograms(appender storage.Appender, lb *labels
}
hist.NegativeBuckets = s.histogram.NegativeDelta

lb.Set(labels.MetricName, h.metricName)
_, err = appender.AppendHistogram(0, lb.Labels(), timeMs, &hist, nil)
_, err = appender.AppendHistogram(0, lbls, timeMs, &hist, nil)
if err != nil {
return err
}

return
}

func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *labels.Builder, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) {
func (h *nativeHistogram) classicHistograms(appender storage.Appender, timeMs int64, s *nativeHistogramSeries) (activeSeries int, err error) {
if s.isNew() {
lb.Set(labels.MetricName, h.metricName+"_count")
endOfLastMinuteMs := getEndOfLastMinuteMs(timeMs)
_, err = appender.Append(0, lb.Labels(), endOfLastMinuteMs, 0)
_, err = appender.Append(0, s.countLabels, endOfLastMinuteMs, 0)
if err != nil {
return activeSeries, err
}
s.registerSeenSeries()
}

// sum
lb.Set(labels.MetricName, h.metricName+"_sum")
_, err = appender.Append(0, lb.Labels(), timeMs, s.histogram.GetSampleSum())
_, err = appender.Append(0, s.sumLabels, timeMs, s.histogram.GetSampleSum())
if err != nil {
return activeSeries, err
}
activeSeries++

// count
lb.Set(labels.MetricName, h.metricName+"_count")
_, err = appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
_, err = appender.Append(0, s.countLabels, timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
if err != nil {
return activeSeries, err
}
activeSeries++

// bucket
lb.Set(labels.MetricName, h.metricName+"_bucket")
s.lb.Set(labels.MetricName, h.metricName+"_bucket")

// the Prometheus histogram will sometimes add the +Inf bucket, it depends on whether there is an exemplar
// for that bucket or not. To avoid adding it twice, keep track of it with this boolean.
infBucketWasAdded := false

for _, bucket := range s.histogram.Bucket {
// add "le" label
lb.Set(labels.BucketLabel, formatFloat(bucket.GetUpperBound()))
s.lb.Set(labels.BucketLabel, formatFloat(bucket.GetUpperBound()))

if bucket.GetUpperBound() == math.Inf(1) {
infBucketWasAdded = true
}

ref, appendErr := appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(bucket.GetCumulativeCountFloat(), bucket.GetCumulativeCount()))
ref, appendErr := appender.Append(0, s.lb.Labels(), timeMs, getIfGreaterThenZeroOr(bucket.GetCumulativeCountFloat(), bucket.GetCumulativeCount()))
if appendErr != nil {
return activeSeries, appendErr
}
activeSeries++

if bucket.Exemplar != nil && len(bucket.Exemplar.Label) > 0 {
// TODO are we appending the same exemplar twice?
_, err = appender.AppendExemplar(ref, lb.Labels(), exemplar.Exemplar{
_, err = appender.AppendExemplar(ref, s.lb.Labels(), exemplar.Exemplar{
Labels: convertLabelPairToLabels(bucket.Exemplar.GetLabel()),
Value: bucket.Exemplar.GetValue(),
Ts: timeMs,
Expand All @@ -346,17 +368,17 @@ func (h *nativeHistogram) classicHistograms(appender storage.Appender, lb *label

if !infBucketWasAdded {
// Add +Inf bucket
lb.Set(labels.BucketLabel, "+Inf")
s.lb.Set(labels.BucketLabel, "+Inf")

_, err = appender.Append(0, lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
_, err = appender.Append(0, s.lb.Labels(), timeMs, getIfGreaterThenZeroOr(s.histogram.GetSampleCountFloat(), s.histogram.GetSampleCount()))
if err != nil {
return activeSeries, err
}
activeSeries++
}

// drop "le" label again
lb.Del(labels.BucketLabel)
s.lb.Del(labels.BucketLabel)

return
}
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/registry/native_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Test_ObserveWithExemplar_duplicate(t *testing.T) {
return true
}

h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id", HistogramModeBoth)
h := newNativeHistogram("my_histogram", []float64{0.1, 0.2}, onAdd, nil, "trace_id", HistogramModeBoth, nil)

lv := newLabelValueCombo([]string{"label"}, []string{"value-1"})

Expand Down Expand Up @@ -463,7 +463,7 @@ func Test_Histograms(t *testing.T) {
}

onAdd := func(uint32) bool { return true }
h := newNativeHistogram("test_histogram", tc.buckets, onAdd, nil, "trace_id", HistogramModeBoth)
h := newNativeHistogram("test_histogram", tc.buckets, onAdd, nil, "trace_id", HistogramModeBoth, nil)
testHistogram(t, h, tc.collections)
})
})
Expand Down
2 changes: 1 addition & 1 deletion modules/generator/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *ManagedRegistry) NewHistogram(name string, buckets []float64, histogram
// are disabled, eventually the new implementation can handle all cases

if hasNativeHistograms(histogramOverride) {
h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, histogramOverride)
h = newNativeHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, histogramOverride, r.externalLabels)
} else {
h = newHistogram(name, buckets, r.onAddMetricSeries, r.onRemoveMetricSeries, traceIDLabelName, r.externalLabels)
}
Expand Down

0 comments on commit ffeaac7

Please sign in to comment.