Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added min_over_time TraceQL metric #3975

Merged
merged 22 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,14 @@ func newMetricsAggregate(agg MetricsAggregateOp, by []Attribute) *MetricsAggrega
}
}

func newMetricsAggregateWithAttr(agg MetricsAggregateOp, attr Attribute, by []Attribute) *MetricsAggregate {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
return &MetricsAggregate{
op: agg,
attr: attr,
by: by,
}
}

func newMetricsAggregateQuantileOverTime(attr Attribute, qs []float64, by []Attribute) *MetricsAggregate {
return &MetricsAggregate{
op: metricsAggregateQuantileOverTime,
Expand Down Expand Up @@ -1073,6 +1081,11 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}
case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewMinOverTimeAggregator(a.attr) }
exemplarFn = func(s Span) (float64, uint64) {
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
Expand Down Expand Up @@ -1206,6 +1219,7 @@ func (a *MetricsAggregate) result() SeriesSet {
func (a *MetricsAggregate) validate() error {
switch a.op {
case metricsAggregateCountOverTime:
case metricsAggregateMinOverTime:
case metricsAggregateRate:
case metricsAggregateHistogramOverTime:
if len(a.by) >= maxGroupBys {
Expand Down
5 changes: 5 additions & 0 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,11 @@ func (m *mockSpan) WithSpanString(key string, value string) *mockSpan {
return m
}

func (m *mockSpan) WithSpanInt(key string, value int) *mockSpan {
m.attributes[NewScopedAttribute(AttributeScopeSpan, false, key)] = NewStaticInt(value)
return m
}

func (m *mockSpan) WithAttrBool(key string, value bool) *mockSpan {
m.attributes[NewAttribute(key)] = NewStaticBool(value)
return m
Expand Down
65 changes: 64 additions & 1 deletion pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,69 @@ func (c *CountOverTimeAggregator) Sample() float64 {
return c.count * c.rateMult
}

// MinOverTimeAggregator it calculates the mininum value over time. It can also
// calculate the rate when given a multiplier.
type MinOverTimeAggregator struct {
firstTime bool
getSpanAttValue func(s Span) float64
min float64
rateMult float64
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
}

var _ VectorAggregator = (*MinOverTimeAggregator)(nil)

func NewMinOverTimeAggregator(attr Attribute) *MinOverTimeAggregator {
var fn func(s Span) float64
switch attr {
case IntrinsicDurationAttribute:
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
fn = func(s Span) float64 {
return float64(s.DurationNanos())
}
default:
fn = func(s Span) float64 {
return floatizeAttribute(s, attr)
}
}
return &MinOverTimeAggregator{
getSpanAttValue: fn,
rateMult: 1.0,
}
}

func (c *MinOverTimeAggregator) Observe(s Span) {
val := c.getSpanAttValue(s)
if !c.firstTime {
c.min = val
c.firstTime = true
} else if val < c.min {
c.min = val
}
}

func (c *MinOverTimeAggregator) Sample() float64 {
return c.min * c.rateMult
javiermolinar marked this conversation as resolved.
Show resolved Hide resolved
}

// Copyed from ast.go. Probably should be refactored elsewhere
func floatizeAttribute(s Span, attr Attribute) float64 {
v, ok := s.AttributeFor(attr)
if !ok {
return 0
}
switch v.Type {
case TypeInt:
n, _ := v.Int()
return float64(n)
case TypeDuration:
d, _ := v.Duration()
return float64(d.Nanoseconds())
case TypeFloat:
return v.Float()
default:
return 0
}
}

// StepAggregator sorts spans into time slots using a step interval like 30s or 1m
type StepAggregator struct {
start, end, step uint64
Expand Down Expand Up @@ -724,7 +787,7 @@ func (e *Engine) CompileMetricsQueryRangeNonRaw(req *tempopb.QueryRangeRequest,
}, nil
}

// CompileMetricsQueryRange returns an evalulator that can be reused across multiple data sources.
// CompileMetricsQueryRange returns an evaluator that can be reused across multiple data sources.
// Dedupe spans parameter is an indicator of whether to expect duplicates in the datasource. For
// example if the datasource is replication factor=1 or only a single block then we know there
// aren't duplicates, and we can make some optimizations.
Expand Down
171 changes: 142 additions & 29 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ func TestQuantileOverTime(t *testing.T) {
}

var (
e = NewEngine()
_128ns = 0.000000128
_256ns = 0.000000256
_512ns = 0.000000512
Expand Down Expand Up @@ -437,43 +436,152 @@ func TestQuantileOverTime(t *testing.T) {
},
}

// 3 layers of processing matches: query-frontend -> queriers -> generators -> blocks
layer1, err := e.CompileMetricsQueryRange(req, false, 0, 0, false)
require.NoError(t, err)
testTraceQlMetric(t, in, out, req)
}

layer2, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeSum)
require.NoError(t, err)
func percentileHelper(q float64, values ...float64) float64 {
h := Histogram{}
for _, v := range values {
h.Record(v, 1)
}
return Log2Quantile(q, h.Buckets)
}

layer3, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeFinal)
require.NoError(t, err)
func TestCountOverTime(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | count_over_time() by (span.foo)",
}

// Pass spans to layer 1
for _, s := range in {
layer1.metricsPipeline.observe(s)
// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

// Pass layer 1 to layer 2
// These are partial counts over time by bucket
res := layer1.Results()
layer2.metricsPipeline.observeSeries(res.ToProto(req))
// Output series with quantiles per foo
// Prom labels are sorted alphabetically, traceql labels maintain original order.
out := SeriesSet{
`{span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
},
Values: []float64{0, 0, 3},
Exemplars: make([]Exemplar, 0),
},
`{span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
},
Values: []float64{3, 4, 0},
Exemplars: make([]Exemplar, 0),
},
}

// Pass layer 2 to layer 3
// These are summed counts over time by bucket
res = layer2.Results()
layer3.ObserveSeries(res.ToProto(req))
testTraceQlMetric(t, in, out, req)
}

// Layer 3 final results
// The quantiles
final := layer3.Results()
require.Equal(t, out, final)
func TestMinOverTimeForDuration(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | min_over_time(duration) by (span.foo)",
}

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

// Output series with quantiles per foo
// Prom labels are sorted alphabetically, traceql labels maintain original order.
out := SeriesSet{
`{span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
},
Values: []float64{0, 0, 512},
Exemplars: make([]Exemplar, 0),
},
`{span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
},
Values: []float64{128, 8, 0},
Exemplars: make([]Exemplar, 0),
},
}

testTraceQlMetric(t, in, out, req)
}

func percentileHelper(q float64, values ...float64) float64 {
h := Histogram{}
for _, v := range values {
h.Record(v, 1)
func TestMinOverTimeForSpanAttribute(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{ } | min_over_time(span.http.status_code) by (span.foo)",
}
return Log2Quantile(q, h.Buckets)

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 404).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 201).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 401).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}

// Output series with quantiles per foo
// Prom labels are sorted alphabetically, traceql labels maintain original order.
out := SeriesSet{
`{span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
},
Values: []float64{0, 0, 201},
Exemplars: make([]Exemplar, 0),
},
`{span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
},
Values: []float64{200, 200, 0},
Exemplars: make([]Exemplar, 0),
},
}
testTraceQlMetric(t, in, out, req)
}

func TestHistogramOverTime(t *testing.T) {
Expand All @@ -485,7 +593,6 @@ func TestHistogramOverTime(t *testing.T) {
}

var (
e = NewEngine()
_128ns = NewStaticFloat(0.000000128)
_256ns = NewStaticFloat(0.000000256)
_512ns = NewStaticFloat(0.000000512)
Expand Down Expand Up @@ -544,6 +651,11 @@ func TestHistogramOverTime(t *testing.T) {
},
}

testTraceQlMetric(t, in, out, req)
}

func testTraceQlMetric(t *testing.T, in []Span, out SeriesSet, req *tempopb.QueryRangeRequest) {
e := NewEngine()
// 3 layers of processing matches: query-frontend -> queriers -> generators -> blocks
layer1, err := e.CompileMetricsQueryRange(req, false, 0, 0, false)
require.NoError(t, err)
Expand Down Expand Up @@ -572,6 +684,7 @@ func TestHistogramOverTime(t *testing.T) {
// Layer 3 final results
// The quantiles
final := layer3.Results()

require.Equal(t, out, final)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/traceql/enum_aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type MetricsAggregateOp int
const (
metricsAggregateRate MetricsAggregateOp = iota
metricsAggregateCountOverTime
metricsAggregateMinOverTime
metricsAggregateQuantileOverTime
metricsAggregateHistogramOverTime
)
Expand All @@ -64,6 +65,8 @@ func (a MetricsAggregateOp) String() string {
return "rate"
case metricsAggregateCountOverTime:
return "count_over_time"
case metricsAggregateMinOverTime:
return "min_over_time"
case metricsAggregateQuantileOverTime:
return "quantile_over_time"
case metricsAggregateHistogramOverTime:
Expand Down
4 changes: 3 additions & 1 deletion pkg/traceql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ import (
COUNT AVG MAX MIN SUM
BY COALESCE SELECT
END_ATTRIBUTE
RATE COUNT_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME COMPARE
RATE COUNT_OVER_TIME MIN_OVER_TIME QUANTILE_OVER_TIME HISTOGRAM_OVER_TIME COMPARE
WITH

// Operators are listed with increasing precedence.
Expand Down Expand Up @@ -297,6 +297,8 @@ metricsAggregation:
| RATE OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateRate, $6) }
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, nil) }
| COUNT_OVER_TIME OPEN_PARENS CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregate(metricsAggregateCountOverTime, $6) }
| MIN_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMinOverTime, $3, nil) }
| MIN_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateWithAttr(metricsAggregateMinOverTime, $3, $7) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, nil) }
| QUANTILE_OVER_TIME OPEN_PARENS attribute COMMA numericList CLOSE_PARENS BY OPEN_PARENS attributeList CLOSE_PARENS { $$ = newMetricsAggregateQuantileOverTime($3, $5, $9) }
| HISTOGRAM_OVER_TIME OPEN_PARENS attribute CLOSE_PARENS { $$ = newMetricsAggregateHistogramOverTime($3, nil) }
Expand Down
Loading
Loading