Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetMetrics second pass + other improvements #2501

Merged
merged 15 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/vparquet2"
"github.com/grafana/tempo/tempodb/wal"
)

Expand Down Expand Up @@ -42,7 +42,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Processor.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.TracesWAL.Version = encoding.DefaultEncoding().Version()
cfg.TracesWAL.Version = vparquet2.VersionString
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/processor/localblocks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"time"

"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet2"
)

const (
Expand All @@ -26,7 +26,7 @@ type Config struct {

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.Block = &common.BlockConfig{}
cfg.Block.Version = encoding.DefaultEncoding().Version()
cfg.Block.Version = vparquet2.VersionString
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
cfg.Block.RegisterFlagsAndApplyDefaults(prefix, f)

cfg.Search = &tempodb.SearchConfig{}
Expand Down
31 changes: 17 additions & 14 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ
p.blocksMtx.RLock()
defer p.blocksMtx.RUnlock()

var err error

// Blocks to check
blocks := make([]common.BackendBlock, 0, 1+len(p.walBlocks)+len(p.completeBlocks))
if p.headBlock != nil {
Expand All @@ -280,23 +282,23 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ
for _, b := range blocks {

// Including the trace count in the cache key means we can safely
// cache results for a wal block
// cache results for a wal block which can receive new data
key := fmt.Sprintf("b:%s-c:%d-q:%s-g:%s", b.BlockMeta().BlockID.String(), b.BlockMeta().TotalObjects, req.Query, req.GroupBy)
if r := p.metricsCacheGet(key); r != nil {
m.Combine(r)
continue
}

f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return b.Fetch(ctx, req, common.DefaultSearchOptions())
})
r := p.metricsCacheGet(key)
if r == nil {

r, err := traceqlmetrics.GetMetrics(ctx, req.Query, req.GroupBy, 0, f)
if err != nil {
return nil, err
}
f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return b.Fetch(ctx, req, common.DefaultSearchOptions())
})

p.metricsCacheSet(key, r)
r, err = traceqlmetrics.GetMetrics(ctx, req.Query, req.GroupBy, 0, f)
if err != nil {
return nil, err
}

p.metricsCacheSet(key, r)
}

m.Combine(r)

Expand Down Expand Up @@ -391,7 +393,6 @@ func (p *Processor) deleteOldBlocks() (err error) {

func (p *Processor) cutIdleTraces(immediate bool) error {
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

// Record live traces before flushing so we know the high water mark
metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces)))
Expand All @@ -403,6 +404,8 @@ func (p *Processor) cutIdleTraces(immediate bool) error {

tracesToCut := p.liveTraces.CutIdle(since)

p.liveTracesMtx.Unlock()

if len(tracesToCut) == 0 {
return nil
}
Expand Down
36 changes: 23 additions & 13 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,30 @@ func newSpansetOperation(op Operator, lhs SpansetExpression, rhs SpansetExpressi
func (SpansetOperation) __spansetExpression() {}

type SpansetFilter struct {
Expression FieldExpression
Expression FieldExpression
outputBuffer []*Spanset
matchingSpansBuffer []Span
}

func newSpansetFilter(e FieldExpression) SpansetFilter {
return SpansetFilter{
func newSpansetFilter(e FieldExpression) *SpansetFilter {
return &SpansetFilter{
Expression: e,
}
}

// nolint: revive
func (SpansetFilter) __spansetExpression() {}
func (*SpansetFilter) __spansetExpression() {}

func (f SpansetFilter) evaluate(input []*Spanset) ([]*Spanset, error) {
var output []*Spanset
func (f *SpansetFilter) evaluate(input []*Spanset) ([]*Spanset, error) {
f.outputBuffer = f.outputBuffer[:0]

for _, ss := range input {
if len(ss.Spans) == 0 {
continue
}

var matchingSpans []Span
f.matchingSpansBuffer = f.matchingSpansBuffer[:0]
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved

for _, s := range ss.Spans {
result, err := f.Expression.execute(s)
if err != nil {
Expand All @@ -285,19 +288,26 @@ func (f SpansetFilter) evaluate(input []*Spanset) ([]*Spanset, error) {
continue
}

matchingSpans = append(matchingSpans, s)
f.matchingSpansBuffer = append(f.matchingSpansBuffer, s)
}

if len(f.matchingSpansBuffer) == 0 {
continue
}

if len(matchingSpans) == 0 {
if len(f.matchingSpansBuffer) == len(ss.Spans) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, i think this is safe, but i need to mention the bug where manipulating the input slice of spans caused the double add the sync pool and panic'ed everything.

we do have tests this cover this and working through this in my head makes me believe there are no issues.

// All matched, so we return the input as-is
// and preserve the local buffer.
f.outputBuffer = append(f.outputBuffer, ss)
continue
}

matchingSpanset := *ss
matchingSpanset.Spans = matchingSpans
output = append(output, &matchingSpanset)
matchingSpanset := ss.clone()
matchingSpanset.Spans = append([]Span(nil), f.matchingSpansBuffer...)
f.outputBuffer = append(f.outputBuffer, matchingSpanset)
}

return output, nil
return f.outputBuffer, nil
}

type ScalarFilter struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/traceql/ast_conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestSpansetFilter_extractConditions(t *testing.T) {
expr, err := Parse(tt.query)
require.NoError(t, err)

spansetFilter := expr.Pipeline.Elements[0].(SpansetFilter)
spansetFilter := expr.Pipeline.Elements[0].(*SpansetFilter)

req := &FetchSpansRequest{
Conditions: []Condition{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestSpansetFilterEvaluate(t *testing.T) {
ast, err := Parse(tc.query)
require.NoError(t, err)

filt := ast.Pipeline.Elements[0].(SpansetFilter)
filt := ast.Pipeline.Elements[0].(*SpansetFilter)

actual, err := filt.evaluate(tc.input)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/traceql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
spansetPipelineExpression SpansetExpression
wrappedSpansetPipeline Pipeline
spansetPipeline Pipeline
spansetFilter SpansetFilter
spansetFilter *SpansetFilter
scalarFilter ScalarFilter
scalarFilterOperation Operator

Expand Down
2 changes: 1 addition & 1 deletion pkg/traceql/expr.y.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,8 @@ func (s *Spanset) AddAttribute(key string, value Static) {
}

func (s *Spanset) clone() *Spanset {
return &Spanset{
TraceID: s.TraceID,
Scalar: s.Scalar,
RootSpanName: s.RootSpanName,
RootServiceName: s.RootServiceName,
StartTimeUnixNanos: s.StartTimeUnixNanos,
DurationNanos: s.DurationNanos,
Spans: s.Spans, // we're not deep cloning into the spans or attributes
Attributes: s.Attributes,
}
ss := *s
return &ss
}

type SpansetIterator interface {
Expand Down
59 changes: 34 additions & 25 deletions pkg/traceqlmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int
)

// Ensure that we select the span duration, status, and group-by attribute
// if they are not already included in the query. These are fetched
// without filtering.
// if they are not already included in the query.
addConditionIfNotPresent := func(a traceql.Attribute) {
for _, c := range req.Conditions {
if c.Attribute == a {
Expand All @@ -159,32 +158,17 @@ func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int

req.Conditions = append(req.Conditions, traceql.Condition{Attribute: a})
}
addConditionIfNotPresent(duration)
addConditionIfNotPresent(status)
addConditionIfNotPresent(duration)
addConditionIfNotPresent(groupByAttr)

// Perform the fetch and process the results inside the Filter
// callback. No actual results will be returned from this fetch call,
// But we still need to call Next() at least once.
res, err := fetcher.Fetch(ctx, *req)
if err == util.ErrUnsupported {
return nil, nil
}
if err != nil {
return nil, err
}

for {
ss, err := res.Results.Next(ctx)
if err != nil {
return nil, err
}
if ss == nil {
break
}

// Run engine to assert final query conditions
out, err := eval([]*traceql.Spanset{ss})
// Read the spans in the second pass callback and return nil to discard them.
// We do this because it lets the fetch layer repool the spans because it
// knows we discarded them.
// TODO - Add span.Release() or something that we could use in the loop
// at the bottom to repool the spans?
req.SecondPass = func(s *traceql.Spanset) ([]*traceql.Spanset, error) {
out, err := eval([]*traceql.Spanset{s})
if err != nil {
return nil, err
}
Expand All @@ -206,6 +190,31 @@ func GetMetrics(ctx context.Context, query string, groupBy string, spanLimit int
}
}
}

return nil, err
}

// Perform the fetch and process the results inside the SecondPass
// callback. No actual results will be returned from this fetch call,
// But we still need to call Next() at least once.
res, err := fetcher.Fetch(ctx, *req)
if err == util.ErrUnsupported {
return nil, nil
}
if err != nil {
return nil, err
}

defer res.Results.Close()

for {
ss, err := res.Results.Next(ctx)
if err != nil {
return nil, err
}
if ss == nil {
break
}
}

// The results are estimated if we bailed early due to limit being reached, but only if spanLimit has been set.
Expand Down
Loading