Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added min_over_time TraceQL metric #3975

Merged
merged 22 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar)
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
* [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar)

# v2.6.0-rc.1

Expand Down
23 changes: 22 additions & 1 deletion docs/sources/tempo/traceql/metrics-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ These functions can be added as an operator at the end of any TraceQL query.
`count_over_time`
: Counts the number of matching spans per time interval (see the `step` API parameter)

`min_over_time`
: Returns the minimin value of matching spans values per time interval (see the `step` API parameter)

`quantile_over_time`
: The quantile of the values in the specified interval

Expand Down Expand Up @@ -91,7 +94,25 @@ down by HTTP route.
This might let you determine that `/api/sad` had a higher rate of erroring
spans than `/api/happy`, for example.

### The `quantile_over_time` and `histogram_over_time` functions
### The `<aggregation>_over_time()` functions

The `counts_over_time()` let you counts the number of matching spans per time interval.
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved

```
{ name = "GET /:endpoint" } | count_over_time() by (span.http.status_code)

```
The `min_over_time()` let you aggregate numerical values by computing the minimun value of them, such as the all important span duration.
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved

```
{ name = "GET /:endpoint" } | min_over_time(duration) by (span.http.target)
```

Any numerical attribute on the span is fair game.

```
{ name = "GET /:endpoint" } | min_over_time(span.http.status_code)
```

The `quantile_over_time()` and `histogram_over_time()` functions let you aggregate numerical values, such as the all important span duration.
You can specify multiple quantiles in the same query.
Expand Down
4 changes: 4 additions & 0 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ func (q *Querier) QueryRangeHandler(w http.ResponseWriter, r *http.Request) {
errHandler(ctx, span, err)
return
}
// This is to prevent a panic marshaling nil
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}

if resp != nil && resp.Metrics != nil {
span.SetTag("inspectedBytes", resp.Metrics.InspectedBytes)
Expand Down
91 changes: 40 additions & 51 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,8 @@ type MetricsAggregate struct {
agg SpanAggregator
seriesAgg SeriesAggregator
exemplarFn getExemplar
// Type of operation for simple aggregatation in layers 2 and 3
simpleAggregationOp SimpleAggregationOp
}

func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggregate {
Expand All @@ -1057,6 +1059,14 @@ func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggrega
}
}

func newMetricsAggregateWithAttr(agg MetricsAggregateOp, attr Attribute, by []Attribute) *MetricsAggregate {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
return &MetricsAggregate{
op: agg,
attr: attr,
by: by,
}
}

func newMetricsAggregateQuantileOverTime(attr Attribute, qs []float64, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: metricsAggregateQuantileOverTime,
Expand All @@ -1066,24 +1076,13 @@ func newMetricsAggregateQuantileOverTime(attr Attribute, qs []float64, by []Attr
}
}

func newMetricsAggregateHistogramOverTime(attr Attribute, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: metricsAggregateHistogramOverTime,
by: by,
attr: attr,
}
}

func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
switch a.op {
case metricsAggregateRate, metricsAggregateCountOverTime:
// No extra conditions, start time is already enough
case metricsAggregateQuantileOverTime, metricsAggregateHistogramOverTime:
if !request.HasAttribute(a.attr) {
request.SecondPassConditions = append(request.SecondPassConditions, Condition{
Attribute: a.attr,
})
}
// For metrics aggregators based on a span attribute we have to include it
includeAttribute := a.attr != (Attribute{}) && !request.HasAttribute(a.attr)
if includeAttribute {
request.SecondPassConditions = append(request.SecondPassConditions, Condition{
Attribute: a.attr,
})
}

for _, b := range a.by {
Expand All @@ -1096,16 +1095,6 @@ func (a *MetricsAggregate) extractConditions(request *FetchSpansRequest) {
}

func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode) {
switch mode {
case AggregateModeSum:
a.initSum(q)
return

case AggregateModeFinal:
a.initFinal(q)
return
}

// Raw mode:

var innerAgg func() VectorAggregator
Expand All @@ -1116,12 +1105,20 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
switch a.op {
case metricsAggregateCountOverTime:
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewMinOverTimeAggregator(a.attr) }
a.simpleAggregationOp = minAggregation
exemplarFn = func(s Span) (float64, uint64) {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
Expand All @@ -1131,6 +1128,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
// I.e. a duration of 500ms will be in __bucket==0.512s
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
byFuncLabel = internalLabelBucket
a.simpleAggregationOp = sumAggregation
switch a.attr {
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
Expand All @@ -1142,12 +1140,22 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
// Basic implementation for all other attributes
byFunc = a.bucketizeAttribute
exemplarFn = func(s Span) (float64, uint64) {
v, _ := a.floatizeAttribute(s)
v, _ := FloatizeAttribute(s, a.attr)
return v, a.spanStartTimeMs(s)
}
}
}

switch mode {
case AggregateModeSum:
a.initSum(q)
return

case AggregateModeFinal:
a.initFinal(q)
return
}

a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
}, a.by, byFunc, byFuncLabel)
Expand All @@ -1167,28 +1175,8 @@ func (a *MetricsAggregate) bucketizeSpanDuration(s Span) (Static, bool) {
return NewStaticFloat(Log2Bucketize(d) / float64(time.Second)), true
}

func (a *MetricsAggregate) floatizeAttribute(s Span) (float64, StaticType) {
v, ok := s.AttributeFor(a.attr)
if !ok {
return 0, TypeNil
}

switch v.Type {
case TypeInt:
n, _ := v.Int()
return float64(n), v.Type
case TypeDuration:
d, _ := v.Duration()
return float64(d.Nanoseconds()), v.Type
case TypeFloat:
return v.Float(), v.Type
default:
return 0, TypeNil
}
}

func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
f, t := a.floatizeAttribute(s)
f, t := FloatizeAttribute(s, a.attr)

switch t {
case TypeInt:
Expand All @@ -1213,7 +1201,7 @@ func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
a.seriesAgg = NewSimpleAdditionCombiner(q)
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp)
}

func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
Expand All @@ -1222,7 +1210,7 @@ func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
a.seriesAgg = NewHistogramAggregator(q, a.floats)
default:
// These are simple additions by series
a.seriesAgg = NewSimpleAdditionCombiner(q)
a.seriesAgg = NewSimpleCombiner(q, a.simpleAggregationOp)
}
}

Expand Down Expand Up @@ -1252,6 +1240,7 @@ func (a *MetricsAggregate) result() SeriesSet {
func (a *MetricsAggregate) validate() error {
switch a.op {
case metricsAggregateCountOverTime:
case metricsAggregateMinOverTime:
case metricsAggregateRate:
case metricsAggregateHistogramOverTime:
if len(a.by) >= maxGroupBys {
Expand Down
4 changes: 3 additions & 1 deletion pkg/traceql/ast_stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func (a MetricsAggregate) String() string {

s.WriteString(a.op.String())
s.WriteString("(")
if a.attr != (Attribute{}) {
s.WriteString(a.attr.String())
}
switch a.op {
case metricsAggregateQuantileOverTime:
s.WriteString(a.attr.String())
s.WriteString(",")
for i, f := range a.floats {
s.WriteString(strconv.FormatFloat(f, 'f', 5, 64))
Expand Down
5 changes: 5 additions & 0 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,11 @@ func (m *mockSpan) WithSpanString(key string, value string) *mockSpan {
return m
}

func (m *mockSpan) WithSpanInt(key string, value int) *mockSpan {
m.attributes[NewScopedAttribute(AttributeScopeSpan, false, key)] = NewStaticInt(value)
return m
}

func (m *mockSpan) WithAttrBool(key string, value bool) *mockSpan {
m.attributes[NewAttribute(key)] = NewStaticBool(value)
return m
Expand Down
Loading