diff --git a/pkg/traceql/engine.go b/pkg/traceql/engine.go index 44574a95906..228a1b6d2be 100644 --- a/pkg/traceql/engine.go +++ b/pkg/traceql/engine.go @@ -26,6 +26,20 @@ func NewEngine() *Engine { } } +func (e *Engine) Compile(query string) (func(input []*Spanset) (result []*Spanset, err error), *FetchSpansRequest, error) { + expr, err := Parse(query) + if err != nil { + return nil, nil, err + } + + req := &FetchSpansRequest{ + AllConditions: true, + } + expr.Pipeline.extractConditions(req) + + return expr.Pipeline.evaluate, req, nil +} + func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchRequest, spanSetFetcher SpansetFetcher) (*tempopb.SearchResponse, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "traceql.Engine.ExecuteSearch") defer span.Finish() diff --git a/pkg/traceqlmetrics/metrics.go b/pkg/traceqlmetrics/metrics.go new file mode 100644 index 00000000000..839d2d86b3d --- /dev/null +++ b/pkg/traceqlmetrics/metrics.go @@ -0,0 +1,213 @@ +package traceqlmetrics + +import ( + "context" + "io" + "math" + + "github.com/grafana/tempo/pkg/traceql" + "github.com/grafana/tempo/pkg/util" + "github.com/pkg/errors" +) + +type latencyHistogram struct { + buckets [64]int // Exponential buckets, powers of 2 +} + +func (m *latencyHistogram) Record(durationNanos uint64) { + // Increment bucket that matches log2(duration) + var bucket int + if durationNanos >= 2 { + bucket = int(math.Ceil(math.Log2(float64(durationNanos)))) + } + m.buckets[bucket]++ +} + +func (m *latencyHistogram) Count() int { + total := 0 + for _, count := range m.buckets { + total += count + } + return total +} + +func (m *latencyHistogram) Combine(other latencyHistogram) { + for i := range m.buckets { + m.buckets[i] += other.buckets[i] + } +} + +// Percentile returns the estimated latency percentile in nanoseconds. +func (m *latencyHistogram) Percentile(p float32) uint64 { + + // Maximum amount of samples to include. We round up to better handle + // percentiles on low sample counts (<100). + maxSamples := int(math.Ceil(float64(p) * float64(m.Count()))) + + // Find the bucket where the percentile falls in + // and the total sample count less than or equal + // to that bucket. + var total, bucket int + for b, count := range m.buckets { + if total+count < maxSamples { + bucket = b + total += count + continue + } + + // We have enough + break + } + + // Fraction to interpolate between buckets, sample-count wise. + // 0.5 means halfway + interp := float64(maxSamples-total) / float64(m.buckets[bucket+1]) + + // Exponential interpolation between buckets + minDur := math.Pow(2, float64(bucket)) + dur := minDur * math.Pow(2, interp) + + return uint64(dur) +} + +type MetricsResults struct { + Estimated bool + SpanCount int + Series map[traceql.Static]*latencyHistogram + Errors map[traceql.Static]int +} + +func NewMetricsResults() *MetricsResults { + return &MetricsResults{ + Series: map[traceql.Static]*latencyHistogram{}, + Errors: map[traceql.Static]int{}, + } +} + +func (m *MetricsResults) Record(series traceql.Static, durationNanos uint64, err bool) { + s := m.Series[series] + if s == nil { + s = &latencyHistogram{} + m.Series[series] = s + } + s.Record(durationNanos) + + if err { + m.Errors[series]++ + } +} + +func (m *MetricsResults) Combine(other *MetricsResults) { + + m.SpanCount += other.SpanCount + if other.Estimated { + m.Estimated = true + } + + for k, v := range other.Series { + s := m.Series[k] + if s == nil { + s = &latencyHistogram{} + m.Series[k] = s + } + s.Combine(*v) + } + + for k, v := range other.Errors { + m.Errors[k] += v + } +} + +// GetMetrics +func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int, fetcher traceql.SpansetFetcher) (*MetricsResults, error) { + groupByAttr, err := traceql.ParseIdentifier(groupBy) + if err != nil { + return nil, errors.Wrap(err, "parsing groupby") + } + + eval, req, err := traceql.NewEngine().Compile(query) + if err != nil { + return nil, errors.Wrap(err, "compiling query") + } + + var ( + duration = traceql.NewIntrinsic(traceql.IntrinsicDuration) + status = traceql.NewIntrinsic(traceql.IntrinsicStatus) + statusErr = traceql.NewStaticStatus(traceql.StatusError) + spanCount = 0 + series = NewMetricsResults() + ) + + // Ensure that we select the span duration, status, and group-by attribute + // if they are not already included in the query. These are fetched + // without filtering. + addConditionIfNotPresent := func(a traceql.Attribute) { + for _, c := range req.Conditions { + if c.Attribute == a { + return + } + } + + req.Conditions = append(req.Conditions, traceql.Condition{Attribute: a}) + } + addConditionIfNotPresent(duration) + addConditionIfNotPresent(status) + addConditionIfNotPresent(groupByAttr) + + // This filter callback processes the matching spans into the + // bucketed metrics. It returns nil because we don't need any + // results after this. + req.Filter = func(in *traceql.Spanset) ([]*traceql.Spanset, error) { + + // Run engine to assert final query conditions + out, err := eval([]*traceql.Spanset{in}) + if err != nil { + return nil, err + } + + for _, ss := range out { + for _, s := range ss.Spans { + + var ( + attr = s.Attributes() + group = attr[groupByAttr] + err = attr[status] == statusErr + ) + + series.Record(group, s.DurationNanos(), err) + + spanCount++ + if spanCount >= spanLimit { + return nil, io.EOF + } + } + } + return nil, nil + } + + // Perform the fetch and process the results inside the Filter + // callback. No actual results will be returned from this fetch call, + // But we still need to call Next() at least once. + res, err := fetcher.Fetch(ctx, *req) + if err == util.ErrUnsupported { + return nil, nil + } + if err != nil { + return nil, err + } + + for { + ss, err := res.Results.Next(ctx) + if err != nil { + return nil, err + } + if ss == nil { + break + } + } + + // The results are estimated if we bailed early due to limit being reached. + series.Estimated = spanCount >= spanLimit + series.SpanCount = spanCount + return series, nil +} diff --git a/pkg/traceqlmetrics/metrics_test.go b/pkg/traceqlmetrics/metrics_test.go new file mode 100644 index 00000000000..4dbf489b646 --- /dev/null +++ b/pkg/traceqlmetrics/metrics_test.go @@ -0,0 +1,123 @@ +package traceqlmetrics + +import ( + "context" + "testing" + + "github.com/grafana/tempo/pkg/traceql" + "github.com/stretchr/testify/require" +) + +func TestPercentile(t *testing.T) { + + testCases := []struct { + name string + durations []uint64 + quartile float32 + value uint64 + }{ + { + name: "easy mode", + durations: []uint64{2, 4, 6, 8}, + quartile: 0.5, + value: uint64(4), + }, + { + // 10 samples + // p75 rounds means 7.5 samples, rounds up to 8 + // 5 samples from the 2048 bucket + // 3 samples from the 4096 bucket + // interpolation: 3/5ths from 2048 to 4096 exponentially + // = 2048 * 2^0.6 = 3104.1875... + name: "interpolate between buckets", + durations: []uint64{2000, 2000, 2000, 2000, 2000, 4000, 4000, 4000, 4000, 4000}, + quartile: 0.75, + value: uint64(3104), + }, + } + + for _, tc := range testCases { + m := &latencyHistogram{} + for _, d := range tc.durations { + m.Record(d) + } + got := m.Percentile(tc.quartile) + require.Equal(t, tc.value, got, tc.name) + } +} + +func TestMetricsResultsCombine(t *testing.T) { + a := traceql.NewStaticString("1") + b := traceql.NewStaticString("2") + c := traceql.NewStaticString("3") + + m := NewMetricsResults() + m.Record(a, 1, true) + m.Record(b, 1, false) + m.Record(b, 1, false) + m.Record(b, 1, true) + + m2 := NewMetricsResults() + m2.Record(b, 1, true) + m2.Record(c, 1, false) + m2.Record(c, 1, false) + m2.Record(c, 1, true) + + m.Combine(m2) + + require.Equal(t, 3, len(m.Series)) + require.Equal(t, 3, len(m.Errors)) + + require.Equal(t, 1, m.Series[a].Count()) + require.Equal(t, 4, m.Series[b].Count()) + require.Equal(t, 3, m.Series[c].Count()) + + require.Equal(t, 1, m.Errors[a]) + require.Equal(t, 2, m.Errors[b]) + require.Equal(t, 1, m.Errors[c]) +} + +func TestGetMetrics(t *testing.T) { + + ctx := context.TODO() + query := "{}" + groupBy := "span.foo" + + m := &mockFetcher{ + Spansets: []*traceql.Spanset{ + { + Spans: []traceql.Span{ + newMockSpan(128, "span.foo", "1"), + newMockSpan(128, "span.foo", "1"), // p50 for foo=1 + newMockSpan(256, "span.foo", "1"), + newMockSpan(256, "span.foo", "1"), + newMockSpan(256, "span.foo", "2"), + newMockSpan(256, "span.foo", "2"), // p50 for foo=2 + newMockSpan(512, "span.foo", "2"), + newMockSpan(512, "span.foo", "2").WithErr(), + }, + }, + }, + } + + res, err := GetMetrics(ctx, query, groupBy, 1000, m) + require.NoError(t, err) + require.NotNil(t, res) + + one := traceql.NewStaticString("1") + two := traceql.NewStaticString("2") + + require.Equal(t, 0, res.Errors[one]) + require.Equal(t, 1, res.Errors[two]) + + require.NotNil(t, res.Series[one]) + require.NotNil(t, res.Series[two]) + + require.Equal(t, uint64(128), res.Series[one].Percentile(0.5)) // p50 + require.Equal(t, uint64(181), res.Series[one].Percentile(0.75)) // p75, 128 * 2^0.5 = 181 + require.Equal(t, uint64(256), res.Series[one].Percentile(1.0)) // p100 + + require.Equal(t, uint64(256), res.Series[two].Percentile(0.5)) // p50 + require.Equal(t, uint64(362), res.Series[two].Percentile(0.75)) // p75, 256 * 2^0.5 = 362 + require.Equal(t, uint64(512), res.Series[two].Percentile(1.0)) // p100 +} diff --git a/pkg/traceqlmetrics/mocks.go b/pkg/traceqlmetrics/mocks.go new file mode 100644 index 00000000000..66242b9f3b9 --- /dev/null +++ b/pkg/traceqlmetrics/mocks.go @@ -0,0 +1,85 @@ +package traceqlmetrics + +import ( + "context" + + "github.com/grafana/tempo/pkg/traceql" +) + +type mockSpan struct { + duration uint64 + attrs map[traceql.Attribute]traceql.Static +} + +var _ traceql.Span = (*mockSpan)(nil) + +func newMockSpan(duration uint64, nameValuePairs ...string) *mockSpan { + m := &mockSpan{ + duration: duration, + attrs: map[traceql.Attribute]traceql.Static{}, + } + + for i := 0; i < len(nameValuePairs); i += 2 { + attr := traceql.MustParseIdentifier(nameValuePairs[i]) + value := traceql.NewStaticString(nameValuePairs[i+1]) + m.attrs[attr] = value + } + + return m +} + +func (m *mockSpan) WithErr() *mockSpan { + m.attrs[traceql.NewIntrinsic(traceql.IntrinsicStatus)] = traceql.NewStaticStatus(traceql.StatusError) + return m +} + +func (m *mockSpan) Attributes() map[traceql.Attribute]traceql.Static { return m.attrs } +func (m *mockSpan) ID() []byte { return nil } +func (m *mockSpan) StartTimeUnixNanos() uint64 { return 0 } +func (m *mockSpan) DurationNanos() uint64 { return m.duration } + +type mockFetcher struct { + filter traceql.FilterSpans + Spansets []*traceql.Spanset +} + +var _ traceql.SpansetFetcher = (*mockFetcher)(nil) +var _ traceql.SpansetIterator = (*mockFetcher)(nil) + +func (m *mockFetcher) Fetch(_ context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { + m.filter = req.Filter + return traceql.FetchSpansResponse{ + Results: m, + }, nil +} + +func (m *mockFetcher) Next(context.Context) (*traceql.Spanset, error) { + if len(m.Spansets) == 0 { + return nil, nil + } + + // Pop first + s := m.Spansets[0] + m.Spansets = m.Spansets[1:] + + // Return as-is + if m.filter == nil { + return s, nil + } + + // Run through filter which may return multiple + ss, err := m.filter(s) + if err != nil { + return nil, err + } + + // Just return the first - this will need to change if we ever use + // this mock for more advanced stuff + if len(ss) == 0 { + return nil, nil + } + + return ss[0], nil +} + +func (m *mockFetcher) Close() {}