Skip to content

Commit

Permalink
feat: added min_over_time TraceQL metric (#3975)
Browse files Browse the repository at this point in the history
* added min_over_time traceql metric

* fix parsing

* add metricsAggregateMinOverTime to the extract conditions to fetch the attribute

* fix tests

* refactor code

* refactor FloatizeAttribute

* unused parameter warning

* added a recombiner aggregator

* ammend traceql lex

* handle NaN values

* fix tests

* test that combination of valid values remove default NaNs

* do not show empty time series

* refactor aggregator function

* changelog and documentation

* doc changes

* typo

* bring back quantile doc header

* fix typos
  • Loading branch information
javiermolinar authored Aug 30, 2024
1 parent 1472aac commit 8c6519f
Show file tree
Hide file tree
Showing 15 changed files with 834 additions and 502 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)
* [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
* [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar)

# v2.6.0-rc.1

Expand Down
22 changes: 22 additions & 0 deletions docs/sources/tempo/traceql/metrics-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ These functions can be added as an operator at the end of any TraceQL query.
`count_over_time`
: Counts the number of matching spans per time interval (see the `step` API parameter)

`min_over_time`
: Returns the minimum value of matching spans values per time interval (see the `step` API parameter)

`quantile_over_time`
: The quantile of the values in the specified interval

Expand Down Expand Up @@ -91,6 +94,25 @@ down by HTTP route.
This might let you determine that `/api/sad` had a higher rate of erroring
spans than `/api/happy`, for example.

### The `count_over_time` and `min_over_time` functions

The `count_over_time()` let you counts the number of matching spans per time interval.

```
{ name = "GET /:endpoint" } | count_over_time() by (span.http.status_code)
```
The `min_over_time()` let you aggregate numerical values by computing the minimum value of them, such as the all important span duration.

```
{ name = "GET /:endpoint" } | min_over_time(duration) by (span.http.target)
```

Any numerical attribute on the span is fair game.

```
{ name = "GET /:endpoint" } | min_over_time(span.http.status_code)
```
### The `quantile_over_time` and `histogram_over_time` functions

The `quantile_over_time()` and `histogram_over_time()` functions let you aggregate numerical values, such as the all important span duration.
Expand Down
4 changes: 4 additions & 0 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ func (q *Querier) QueryRangeHandler(w http.ResponseWriter, r *http.Request) {
errHandler(ctx, span, err)
return
}
// This is to prevent a panic marshaling nil
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}

if resp != nil && resp.Metrics != nil {
span.SetTag("inspectedBytes", resp.Metrics.InspectedBytes)
Expand Down
91 changes: 40 additions & 51 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,8 @@ type MetricsAggregate struct {
agg SpanAggregator
seriesAgg SeriesAggregator
exemplarFn getExemplar
// Type of operation for simple aggregatation in layers 2 and 3
simpleAggregationOp SimpleAggregationOp
}

func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggregate {
Expand All @@ -1057,6 +1059,14 @@ func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggrega
}
}

func newMetricsAggregateWithAttr(agg MetricsAggregateOp, attr Attribute, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: agg,
attr: attr,
by: by,
}
}

func newMetricsAggregateQuantileOverTime(attr Attribute, qs []float64, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: metricsAggregateQuantileOverTime,
Expand All @@ -1066,24 +1076,13 @@ func newMetricsAggregateQuantileOverTime(attr Attribute, qs []float64, by []Attr
}
}

func newMetricsAggregateHistogramOverTime(attr Attribute, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: metricsAggregateHistogramOverTime,
by: by,
attr: attr,
}
}

func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
switch a.op {
case metricsAggregateRate, metricsAggregateCountOverTime:
// No extra conditions, start time is already enough
case metricsAggregateQuantileOverTime, metricsAggregateHistogramOverTime:
if !request.HasAttribute(a.attr) {
request.SecondPassConditions = append(request.SecondPassConditions, Condition{
Attribute: a.attr,
})
}
// For metrics aggregators based on a span attribute we have to include it
includeAttribute := a.attr != (Attribute{}) && !request.HasAttribute(a.attr)
if includeAttribute {
request.SecondPassConditions = append(request.SecondPassConditions, Condition{
Attribute: a.attr,
})
}

for _, b := range a.by {
Expand All @@ -1096,16 +1095,6 @@ func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
}

func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode) {
switch mode {
case AggregateModeSum:
a.initSum(q)
return

case AggregateModeFinal:
a.initFinal(q)
return
}

// Raw mode:

var innerAgg func() VectorAggregator
Expand All @@ -1116,12 +1105,20 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
switch a.op {
case metricsAggregateCountOverTime:
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewMinOverTimeAggregator(a.attr) }
a.simpleAggregationOp = minAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
Expand All @@ -1131,6 +1128,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
// I.e. a duration of 500ms will be in __bucket==0.512s
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
byFuncLabel = internalLabelBucket
a.simpleAggregationOp = sumAggregation
switch a.attr {
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
Expand All @@ -1142,12 +1140,22 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
// Basic implementation for all other attributes
byFunc = a.bucketizeAttribute
exemplarFn = func(s Span) (float64, uint64) {
v, _ := a.floatizeAttribute(s)
v, _ := FloatizeAttribute(s, a.attr)
return v, a.spanStartTimeMs(s)
}
}
}

switch mode {
case AggregateModeSum:
a.initSum(q)
return

case AggregateModeFinal:
a.initFinal(q)
return
}

a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
}, a.by, byFunc, byFuncLabel)
Expand All @@ -1167,28 +1175,8 @@ func (a *MetricsAggregate) bucketizeSpanDuration(s Span) (Static, bool) {
return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true
}

func (a *MetricsAggregate) floatizeAttribute(s Span) (float64, StaticType) {
v, ok := s.AttributeFor(a.attr)
if !ok {
return 0, TypeNil
}

switch v.Type {
case TypeInt:
n, _ := v.Int()
return float64(n), v.Type
case TypeDuration:
d, _ := v.Duration()
return float64(d.Nanoseconds()), v.Type
case TypeFloat:
return v.Float(), v.Type
default:
return 0, TypeNil
}
}

func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
f, t := a.floatizeAttribute(s)
f, t := FloatizeAttribute(s, a.attr)

switch t {
case TypeInt:
Expand All @@ -1213,7 +1201,7 @@ func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
a.seriesAgg = NewSimpleAdditionCombiner(q)
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp)
}

func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
Expand All @@ -1222,7 +1210,7 @@ func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
a.seriesAgg = NewHistogramAggregator(q, a.floats)
default:
// These are simple additions by series
a.seriesAgg = NewSimpleAdditionCombiner(q)
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp)
}
}

Expand Down Expand Up @@ -1252,6 +1240,7 @@ func (a *MetricsAggregate) result() SeriesSet {
func (a *MetricsAggregate) validate() error {
switch a.op {
case metricsAggregateCountOverTime:
case metricsAggregateMinOverTime:
case metricsAggregateRate:
case metricsAggregateHistogramOverTime:
if len(a.by) >= maxGroupBys {
Expand Down
4 changes: 3 additions & 1 deletion pkg/traceql/ast_stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func (a MetricsAggregate) String() string {

s.WriteString(a.op.String())
s.WriteString("(")
if a.attr != (Attribute{}) {
s.WriteString(a.attr.String())
}
switch a.op {
case metricsAggregateQuantileOverTime:
s.WriteString(a.attr.String())
s.WriteString(",")
for i, f := range a.floats {
s.WriteString(strconv.FormatFloat(f, 'f', 5, 64))
Expand Down
5 changes: 5 additions & 0 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,11 @@ func (m *mockSpan) WithSpanString(key string, value string) *mockSpan {
return m
}

func (m *mockSpan) WithSpanInt(key string, value int) *mockSpan {
m.attributes[NewScopedAttribute(AttributeScopeSpan, false, key)] = NewStaticInt(value)
return m
}

func (m *mockSpan) WithAttrBool(key string, value bool) *mockSpan {
m.attributes[NewAttribute(key)] = NewStaticBool(value)
return m
Expand Down
Loading

0 comments on commit 8c6519f

Please sign in to comment.