Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort traces on flush to ensure consistent payloads in the backend #606

Merged
merged 9 commits into from
Mar 23, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [BUGFIX] Fixes a bug where some blocks were not searched due to query sharding and randomness in blocklist poll. [#583](https://github.com/grafana/tempo/pull/583)
* [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593)
* [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605)
* [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606)

## v0.6.0

Expand Down
7 changes: 6 additions & 1 deletion modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -73,6 +74,7 @@ func TestFullTraceReturned(t *testing.T) {
_, err = rand.Read(traceID)
assert.NoError(t, err)
trace := test.MakeTrace(2, traceID) // 2 batches
util.SortTrace(trace)

// push the first batch
_, err = ingester.Push(ctx,
Expand Down Expand Up @@ -218,7 +220,10 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
_, err = rand.Read(id)
require.NoError(t, err)

traces = append(traces, test.MakeTrace(10, id))
trace := test.MakeTrace(10, id)
util.SortTrace(trace)

traces = append(traces, trace)
traceIDs = append(traceIDs, id)
}

Expand Down
3 changes: 3 additions & 0 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
tracesToCut := i.tracesToCut(cutoff, immediate)

for _, t := range tracesToCut {

util.SortTrace(t.trace)

out, err := proto.Marshal(t.trace)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestReturnAllHits(t *testing.T) {

// expected trace
expectedTrace, _, _, _ := util.CombineTraceProtos(testTraces[0], testTraces[1])
test.SortTrace(expectedTrace)
util.SortTrace(expectedTrace)

// actual trace
actualTraceBytes, err := util.CombineTraces(foundBytes[1], foundBytes[0])
Expand All @@ -117,6 +117,6 @@ func TestReturnAllHits(t *testing.T) {
err = proto.Unmarshal(actualTraceBytes, actualTrace)
assert.NoError(t, err)

test.SortTrace(actualTrace)
util.SortTrace(actualTrace)
assert.Equal(t, expectedTrace, actualTrace)
}
20 changes: 0 additions & 20 deletions pkg/util/test/req.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package test

import (
"bytes"
"math/rand"
"sort"

"github.com/grafana/tempo/pkg/tempopb"
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
Expand Down Expand Up @@ -71,21 +69,3 @@ func MakeTraceWithSpanCount(requests int, spansEach int, traceID []byte) *tempop

return trace
}

func SortTrace(t *tempopb.Trace) {
sort.Slice(t.Batches, func(i, j int) bool {
return bytes.Compare(t.Batches[i].InstrumentationLibrarySpans[0].Spans[0].SpanId, t.Batches[j].InstrumentationLibrarySpans[0].Spans[0].SpanId) == 1
})

for _, b := range t.Batches {
sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool {
return bytes.Compare(b.InstrumentationLibrarySpans[i].Spans[0].SpanId, b.InstrumentationLibrarySpans[j].Spans[0].SpanId) == 1
})

for _, ils := range b.InstrumentationLibrarySpans {
sort.Slice(ils.Spans, func(i, j int) bool {
return bytes.Compare(ils.Spans[i].SpanId, ils.Spans[j].SpanId) == 1
})
}
}
}
47 changes: 46 additions & 1 deletion pkg/util/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"
"hash"
"hash/fnv"
"sort"

"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

func CombineTraces(objA []byte, objB []byte) ([]byte, error) {
Expand All @@ -19,7 +21,7 @@ func CombineTraces(objA []byte, objB []byte) ([]byte, error) {
return objA, nil
}

// hashes differ. unmarshal and combine traces
// bytes differ. unmarshal and combine traces
traceA := &tempopb.Trace{}
traceB := &tempopb.Trace{}

Expand Down Expand Up @@ -107,9 +109,52 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int
}
}

SortTrace(traceA)

return traceA, spanCountA, spanCountB, spanCountTotal
}

func SortTrace(t *tempopb.Trace) {
// Sort bottom up by span start times
for _, b := range t.Batches {
for _, ils := range b.InstrumentationLibrarySpans {
sort.Slice(ils.Spans, func(i, j int) bool {
return compareSpans(ils.Spans[i], ils.Spans[j])
})
}
sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool {
return compareIls(b.InstrumentationLibrarySpans[i], b.InstrumentationLibrarySpans[j])
})
}
sort.Slice(t.Batches, func(i, j int) bool {
return compareBatches(t.Batches[i], t.Batches[j])
})
}

func compareBatches(a *v1.ResourceSpans, b *v1.ResourceSpans) bool {
if len(a.InstrumentationLibrarySpans) > 0 && len(b.InstrumentationLibrarySpans) > 0 {
return compareIls(a.InstrumentationLibrarySpans[0], b.InstrumentationLibrarySpans[0])
}
return false
}

func compareIls(a *v1.InstrumentationLibrarySpans, b *v1.InstrumentationLibrarySpans) bool {
if len(a.Spans) > 0 && len(b.Spans) > 0 {
return compareSpans(a.Spans[0], b.Spans[0])
}
return false
}

func compareSpans(a *v1.Span, b *v1.Span) bool {
// Sort by start time, then id

if a.StartTimeUnixNano == b.StartTimeUnixNano {
return bytes.Compare(a.SpanId, b.SpanId) == -1
}

return a.StartTimeUnixNano < b.StartTimeUnixNano
}

func tokenForID(h hash.Hash32, b []byte) uint32 {
h.Reset()
_, _ = h.Write(b)
Expand Down
80 changes: 77 additions & 3 deletions pkg/util/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/assert"
)
Expand All @@ -16,6 +17,9 @@ func TestCombine(t *testing.T) {
t1 := test.MakeTrace(10, []byte{0x01, 0x02})
t2 := test.MakeTrace(10, []byte{0x01, 0x03})

SortTrace(t1)
SortTrace(t2)

b1, err := proto.Marshal(t1)
assert.NoError(t, err)
b2, err := proto.Marshal(t2)
Expand Down Expand Up @@ -95,9 +99,6 @@ func TestCombine(t *testing.T) {
err = proto.Unmarshal(actual, actualTrace)
assert.NoError(t, err)

test.SortTrace(actualTrace)
test.SortTrace(expectedTrace)

assert.Equal(t, expectedTrace, actualTrace)
}
}
Expand Down Expand Up @@ -200,3 +201,76 @@ func BenchmarkCombineTraceProtos(b *testing.B) {
})
}
}

func TestSortTrace(t *testing.T) {
tests := []struct {
input *tempopb.Trace
expected *tempopb.Trace
}{
{
input: &tempopb.Trace{},
expected: &tempopb.Trace{},
},

{
input: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
StartTimeUnixNano: 2,
},
},
},
},
},
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
StartTimeUnixNano: 1,
},
},
},
},
},
},
},
expected: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
StartTimeUnixNano: 1,
},
},
},
},
},
{
InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{
{
Spans: []*v1.Span{
{
StartTimeUnixNano: 2,
},
},
},
},
},
},
},
},
}

for _, tt := range tests {
SortTrace(tt.input)

assert.Equal(t, tt.expected, tt.input)
}
}