Skip to content

Commit

Permalink
Arbitrary span duration metrics from any TraceQL-compatible block (gr…
Browse files Browse the repository at this point in the history
…afana#2418)

* Initial working version

* lint

* Record errors per group

* add combine method
  • Loading branch information
mdisibio authored May 4, 2023
1 parent 2344e4a commit 3f562c6
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/traceql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func NewEngine() *Engine {
}
}

func (e *Engine) Compile(query string) (func(input []*Spanset) (result []*Spanset, err error), *FetchSpansRequest, error) {
expr, err := Parse(query)
if err != nil {
return nil, nil, err
}

req := &FetchSpansRequest{
AllConditions: true,
}
expr.Pipeline.extractConditions(req)

return expr.Pipeline.evaluate, req, nil
}

func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchRequest, spanSetFetcher SpansetFetcher) (*tempopb.SearchResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "traceql.Engine.ExecuteSearch")
defer span.Finish()
Expand Down
213 changes: 213 additions & 0 deletions pkg/traceqlmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package traceqlmetrics

import (
"context"
"io"
"math"

"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util"
"github.com/pkg/errors"
)

type latencyHistogram struct {
buckets [64]int // Exponential buckets, powers of 2
}

func (m *latencyHistogram) Record(durationNanos uint64) {
// Increment bucket that matches log2(duration)
var bucket int
if durationNanos >= 2 {
bucket = int(math.Ceil(math.Log2(float64(durationNanos))))
}
m.buckets[bucket]++
}

func (m *latencyHistogram) Count() int {
total := 0
for _, count := range m.buckets {
total += count
}
return total
}

func (m *latencyHistogram) Combine(other latencyHistogram) {
for i := range m.buckets {
m.buckets[i] += other.buckets[i]
}
}

// Percentile returns the estimated latency percentile in nanoseconds.
func (m *latencyHistogram) Percentile(p float32) uint64 {

// Maximum amount of samples to include. We round up to better handle
// percentiles on low sample counts (<100).
maxSamples := int(math.Ceil(float64(p) * float64(m.Count())))

// Find the bucket where the percentile falls in
// and the total sample count less than or equal
// to that bucket.
var total, bucket int
for b, count := range m.buckets {
if total+count < maxSamples {
bucket = b
total += count
continue
}

// We have enough
break
}

// Fraction to interpolate between buckets, sample-count wise.
// 0.5 means halfway
interp := float64(maxSamples-total) / float64(m.buckets[bucket+1])

// Exponential interpolation between buckets
minDur := math.Pow(2, float64(bucket))
dur := minDur * math.Pow(2, interp)

return uint64(dur)
}

type MetricsResults struct {
Estimated bool
SpanCount int
Series map[traceql.Static]*latencyHistogram
Errors map[traceql.Static]int
}

func NewMetricsResults() *MetricsResults {
return &MetricsResults{
Series: map[traceql.Static]*latencyHistogram{},
Errors: map[traceql.Static]int{},
}
}

func (m *MetricsResults) Record(series traceql.Static, durationNanos uint64, err bool) {
s := m.Series[series]
if s == nil {
s = &latencyHistogram{}
m.Series[series] = s
}
s.Record(durationNanos)

if err {
m.Errors[series]++
}
}

func (m *MetricsResults) Combine(other *MetricsResults) {

m.SpanCount += other.SpanCount
if other.Estimated {
m.Estimated = true
}

for k, v := range other.Series {
s := m.Series[k]
if s == nil {
s = &latencyHistogram{}
m.Series[k] = s
}
s.Combine(*v)
}

for k, v := range other.Errors {
m.Errors[k] += v
}
}

// GetMetrics
func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int, fetcher traceql.SpansetFetcher) (*MetricsResults, error) {
groupByAttr, err := traceql.ParseIdentifier(groupBy)
if err != nil {
return nil, errors.Wrap(err, "parsing groupby")
}

eval, req, err := traceql.NewEngine().Compile(query)
if err != nil {
return nil, errors.Wrap(err, "compiling query")
}

var (
duration = traceql.NewIntrinsic(traceql.IntrinsicDuration)
status = traceql.NewIntrinsic(traceql.IntrinsicStatus)
statusErr = traceql.NewStaticStatus(traceql.StatusError)
spanCount = 0
series = NewMetricsResults()
)

// Ensure that we select the span duration, status, and group-by attribute
// if they are not already included in the query. These are fetched
// without filtering.
addConditionIfNotPresent := func(a traceql.Attribute) {
for _, c := range req.Conditions {
if c.Attribute == a {
return
}
}

req.Conditions = append(req.Conditions, traceql.Condition{Attribute: a})
}
addConditionIfNotPresent(duration)
addConditionIfNotPresent(status)
addConditionIfNotPresent(groupByAttr)

// This filter callback processes the matching spans into the
// bucketed metrics. It returns nil because we don't need any
// results after this.
req.Filter = func(in *traceql.Spanset) ([]*traceql.Spanset, error) {

// Run engine to assert final query conditions
out, err := eval([]*traceql.Spanset{in})
if err != nil {
return nil, err
}

for _, ss := range out {
for _, s := range ss.Spans {

var (
attr = s.Attributes()
group = attr[groupByAttr]
err = attr[status] == statusErr
)

series.Record(group, s.DurationNanos(), err)

spanCount++
if spanCount >= spanLimit {
return nil, io.EOF
}
}
}
return nil, nil
}

// Perform the fetch and process the results inside the Filter
// callback. No actual results will be returned from this fetch call,
// But we still need to call Next() at least once.
res, err := fetcher.Fetch(ctx, *req)
if err == util.ErrUnsupported {
return nil, nil
}
if err != nil {
return nil, err
}

for {
ss, err := res.Results.Next(ctx)
if err != nil {
return nil, err
}
if ss == nil {
break
}
}

// The results are estimated if we bailed early due to limit being reached.
series.Estimated = spanCount >= spanLimit
series.SpanCount = spanCount
return series, nil
}
123 changes: 123 additions & 0 deletions pkg/traceqlmetrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package traceqlmetrics

import (
"context"
"testing"

"github.com/grafana/tempo/pkg/traceql"
"github.com/stretchr/testify/require"
)

func TestPercentile(t *testing.T) {

testCases := []struct {
name string
durations []uint64
quartile float32
value uint64
}{
{
name: "easy mode",
durations: []uint64{2, 4, 6, 8},
quartile: 0.5,
value: uint64(4),
},
{
// 10 samples
// p75 rounds means 7.5 samples, rounds up to 8
// 5 samples from the 2048 bucket
// 3 samples from the 4096 bucket
// interpolation: 3/5ths from 2048 to 4096 exponentially
// = 2048 * 2^0.6 = 3104.1875...
name: "interpolate between buckets",
durations: []uint64{2000, 2000, 2000, 2000, 2000, 4000, 4000, 4000, 4000, 4000},
quartile: 0.75,
value: uint64(3104),
},
}

for _, tc := range testCases {
m := &latencyHistogram{}
for _, d := range tc.durations {
m.Record(d)
}
got := m.Percentile(tc.quartile)
require.Equal(t, tc.value, got, tc.name)
}
}

func TestMetricsResultsCombine(t *testing.T) {
a := traceql.NewStaticString("1")
b := traceql.NewStaticString("2")
c := traceql.NewStaticString("3")

m := NewMetricsResults()
m.Record(a, 1, true)
m.Record(b, 1, false)
m.Record(b, 1, false)
m.Record(b, 1, true)

m2 := NewMetricsResults()
m2.Record(b, 1, true)
m2.Record(c, 1, false)
m2.Record(c, 1, false)
m2.Record(c, 1, true)

m.Combine(m2)

require.Equal(t, 3, len(m.Series))
require.Equal(t, 3, len(m.Errors))

require.Equal(t, 1, m.Series[a].Count())
require.Equal(t, 4, m.Series[b].Count())
require.Equal(t, 3, m.Series[c].Count())

require.Equal(t, 1, m.Errors[a])
require.Equal(t, 2, m.Errors[b])
require.Equal(t, 1, m.Errors[c])
}

func TestGetMetrics(t *testing.T) {

ctx := context.TODO()
query := "{}"
groupBy := "span.foo"

m := &mockFetcher{
Spansets: []*traceql.Spanset{
{
Spans: []traceql.Span{
newMockSpan(128, "span.foo", "1"),
newMockSpan(128, "span.foo", "1"), // p50 for foo=1
newMockSpan(256, "span.foo", "1"),
newMockSpan(256, "span.foo", "1"),
newMockSpan(256, "span.foo", "2"),
newMockSpan(256, "span.foo", "2"), // p50 for foo=2
newMockSpan(512, "span.foo", "2"),
newMockSpan(512, "span.foo", "2").WithErr(),
},
},
},
}

res, err := GetMetrics(ctx, query, groupBy, 1000, m)
require.NoError(t, err)
require.NotNil(t, res)

one := traceql.NewStaticString("1")
two := traceql.NewStaticString("2")

require.Equal(t, 0, res.Errors[one])
require.Equal(t, 1, res.Errors[two])

require.NotNil(t, res.Series[one])
require.NotNil(t, res.Series[two])

require.Equal(t, uint64(128), res.Series[one].Percentile(0.5)) // p50
require.Equal(t, uint64(181), res.Series[one].Percentile(0.75)) // p75, 128 * 2^0.5 = 181
require.Equal(t, uint64(256), res.Series[one].Percentile(1.0)) // p100

require.Equal(t, uint64(256), res.Series[two].Percentile(0.5)) // p50
require.Equal(t, uint64(362), res.Series[two].Percentile(0.75)) // p75, 256 * 2^0.5 = 362
require.Equal(t, uint64(512), res.Series[two].Percentile(1.0)) // p100
}
Loading

0 comments on commit 3f562c6

Please sign in to comment.