Skip to content

Commit

Permalink
Replace CombineTraceProtos with new Combiner (#1291)
Browse files Browse the repository at this point in the history
* Create trace.Combiner which has better performance on multiple/larger trace deduping

* Migrate all CombineTraceProtos to Combiner

* lint

* Review feedback, delete CombineTraceProtos

* Wait until first call to alloc span map, using actual input size

* changelog
  • Loading branch information
mdisibio authored Feb 24, 2022
1 parent 61332ca commit 7d26ee2
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Enterprise jsonnet: add config to create tokengen job explicitly [#1256](https://github.com/grafana/tempo/pull/1256) (@kvrhdn)
* [ENHANCEMENT] Add new scaling alerts to the tempo-mixin [#1292](https://github.com/grafana/tempo/pull/1292) (@mapno)
* [ENHANCEMENT] Improve serverless handler error messages [#1305](https://github.com/grafana/tempo/pull/1305) (@joe-elliott)
* [ENHANCEMENT] Make trace combination/compaction more efficient [#1291](https://github.com/grafana/tempo/pull/1291) (@mdisibio)
* [BUGFIX]: Remove unnecessary PersistentVolumeClaim [#1245](https://github.com/grafana/tempo/issues/1245)
* [BUGFIX] Fixed issue when query-frontend doesn't log request details when request is cancelled [#1136](https://github.com/grafana/tempo/issues/1136) (@adityapwr)
* [BUGFIX] Update OTLP port in examples (docker-compose & kubernetes) from legacy ports (55680/55681) to new ports (4317/4318) [#1294](https://github.com/grafana/tempo/pull/1294) (@mapno)
Expand Down
11 changes: 6 additions & 5 deletions cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {
}

var (
combinedTrace *tempopb.Trace
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
combiner = trace.NewCombiner()
marshaller = new(jsonpb.Marshaler)
jsonBytes = bytes.Buffer{}
)

fmt.Println()
for _, result := range results {
for i, result := range results {
fmt.Println(result.blockID, ":")

err := marshaller.Marshal(&jsonBytes, result.trace)
Expand All @@ -64,9 +64,10 @@ func (cmd *queryBlocksCmd) Run(ctx *globalOptions) error {

fmt.Println(jsonBytes.String())
jsonBytes.Reset()
combinedTrace, _ = trace.CombineTraceProtos(result.trace, combinedTrace)
combiner.ConsumeWithFinal(result.trace, i == len(results)-1)
}

combinedTrace, _ := combiner.Result()
fmt.Println("combined:")
err = marshaller.Marshal(&jsonBytes, combinedTrace)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions modules/frontend/tracebyidsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

var overallError error
var totalFailedBlocks uint32
overallTrace := &tempopb.Trace{}
combiner := trace.NewCombiner()
combiner.Consume(&tempopb.Trace{}) // The query path returns a non-nil result even if no inputs (which is different than other paths which return nil for no inputs)
statusCode := http.StatusNotFound
statusMsg := "trace not found"

Expand Down Expand Up @@ -139,7 +140,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

// happy path
statusCode = http.StatusOK
overallTrace, _ = trace.CombineTraceProtos(overallTrace, traceResp.Trace)
combiner.Consume(traceResp.Trace)
}(req)
}
wg.Wait()
Expand All @@ -148,6 +149,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, overallError
}

overallTrace, _ := combiner.Result()
if overallTrace == nil || statusCode != http.StatusOK {
// translate non-404s into 500s. if, for instance, we get a 400 back from an internal component
// it means that we created a bad request. 400 should not be propagated back to the user b/c
Expand Down
14 changes: 9 additions & 5 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
span, ctx := opentracing.StartSpanFromContext(ctx, "Querier.FindTraceByID")
defer span.Finish()

var completeTrace *tempopb.Trace
combiner := trace.NewCombiner()
var spanCount, spanCountTotal, traceCountTotal int
if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll {
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
Expand All @@ -184,16 +184,18 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
return nil, errors.Wrap(err, "error querying ingesters in Querier.FindTraceByID")
}

found := false
for _, r := range responses {
t := r.response.(*tempopb.TraceByIDResponse).Trace
if t != nil {
completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, t)
spanCount = combiner.Consume(t)
spanCountTotal += spanCount
traceCountTotal++
found = true
}
}
span.LogFields(ot_log.String("msg", "done searching ingesters"),
ot_log.Bool("found", completeTrace != nil),
ot_log.Bool("found", found),
ot_log.Int("combinedSpans", spanCountTotal),
ot_log.Int("combinedTraces", traceCountTotal))
}
Expand Down Expand Up @@ -225,17 +227,19 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}
}

completeTrace, spanCount = trace.CombineTraceProtos(completeTrace, storeTrace)
spanCount = combiner.Consume(storeTrace)
spanCountTotal += spanCount
traceCountTotal++

span.LogFields(ot_log.String("msg", "combined trace protos from store"),
ot_log.Bool("found", completeTrace != nil),
ot_log.Bool("found", len(partialTraces) > 0),
ot_log.Int("combinedSpans", spanCountTotal),
ot_log.Int("combinedTraces", traceCountTotal))
}
}

completeTrace, _ := combiner.Result()

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Metrics: &tempopb.TraceByIDMetrics{
Expand Down
5 changes: 4 additions & 1 deletion pkg/model/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ func CombineForRead(obj []byte, dataEncoding string, t *tempopb.Trace) (*tempopb
return nil, fmt.Errorf("error unmarshalling obj (%s): %w", dataEncoding, err)
}

combined, _ := trace.CombineTraceProtos(objTrace, t)
c := trace.NewCombiner()
c.Consume(objTrace)
c.ConsumeWithFinal(t, true)
combined, _ := c.Result()

return combined, nil
}
17 changes: 0 additions & 17 deletions pkg/model/combine_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package model

import (
"fmt"
"math/rand"
"testing"

Expand Down Expand Up @@ -102,22 +101,6 @@ func TestCombine(t *testing.T) {
}
}

func BenchmarkCombineTraceProtos(b *testing.B) {
sizes := []int{1, 10, 1000, 10000, 100000}

for _, size := range sizes {
b.Run(fmt.Sprint(size), func(b *testing.B) {
t1 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x02})
t2 := test.MakeTraceWithSpanCount(1, size, []byte{0x01, 0x03})

b.ResetTimer()
for i := 0; i < b.N; i++ {
trace.CombineTraceProtos(t1, t2)
}
})
}
}

func mustMarshalToObject(trace *tempopb.Trace, encoding string) []byte {
return mustMarshalToObjectWithRange(trace, encoding, 0, 0)
}
Expand Down
149 changes: 102 additions & 47 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,132 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

// CombineTraceProtos combines two trace protos into one. Note that it is destructive.
// All spans are combined into traceA. spanCountA, B, and Total are returned for
// logging purposes.
func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int) {
// if one or the other is nil just return 0 for the one that's nil and -1 for the other. this will be a clear indication this
// code path was taken without unnecessarily counting spans
if traceA == nil {
return traceB, -1
}
// token is uint64 to reduce hash collision rates. Experimentally, it was observed
// that fnv32 could approach a collision rate of 1 in 10,000. fnv64 avoids collisions
// when tested against traces with up to 1M spans (see matching test). A collision
// results in a dropped span during combine.
type token uint64

if traceB == nil {
return traceA, -1
}
func newHash() hash.Hash64 {
return fnv.New64()
}

spanCountTotal := 0
// tokenForID returns a token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token {
binary.LittleEndian.PutUint32(buffer, uint32(kind))

h := fnv.New32()
h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return token(h.Sum64())
}

// Combiner combines multiple partial traces into one, deduping spans based on
// ID and kind. Note that it is destructive. There are design decisions for
// efficiency:
// * Only scan/hash the spans for each input once, which is reused across calls.
// * Only sort the final result once and if needed.
// * Don't scan/hash the spans for the last input (final=true).
type Combiner struct {
result *tempopb.Trace
spans map[token]struct{}
combined bool
}

func NewCombiner() *Combiner {
return &Combiner{}
}

// Consume the given trace and destructively combines its contents.
func (c *Combiner) Consume(tr *tempopb.Trace) (spanCount int) {
return c.ConsumeWithFinal(tr, false)
}

// ConsumeWithFinal consumes the trace, but allows for performance savings when
// it is known that this is the last expected input trace.
func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (spanCount int) {
if tr == nil {
return
}

h := newHash()
buffer := make([]byte, 4)

spansInA := make(map[uint32]struct{})
for _, batchA := range traceA.Batches {
for _, ilsA := range batchA.InstrumentationLibrarySpans {
for _, spanA := range ilsA.Spans {
spansInA[tokenForID(h, buffer, int32(spanA.Kind), spanA.SpanId)] = struct{}{}
// First call?
if c.result == nil {
c.result = tr

// Pre-alloc map with input size. This saves having to grow the
// map from the small starting size.
n := 0
for _, b := range c.result.Batches {
for _, ils := range b.InstrumentationLibrarySpans {
n += len(ils.Spans)
}
spanCountTotal += len(ilsA.Spans)
}
c.spans = make(map[token]struct{}, n)

for _, b := range c.result.Batches {
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
c.spans[tokenForID(h, buffer, int32(s.Kind), s.SpanId)] = struct{}{}
}
}
}
return
}

// loop through every span and copy spans in B that don't exist to A
for _, batchB := range traceB.Batches {
notFoundILS := batchB.InstrumentationLibrarySpans[:0]

for _, ilsB := range batchB.InstrumentationLibrarySpans {
notFoundSpans := ilsB.Spans[:0]
for _, spanB := range ilsB.Spans {
// if found in A, remove from the batch
_, ok := spansInA[tokenForID(h, buffer, int32(spanB.Kind), spanB.SpanId)]
for _, b := range tr.Batches {
notFoundILS := b.InstrumentationLibrarySpans[:0]

for _, ils := range b.InstrumentationLibrarySpans {
notFoundSpans := ils.Spans[:0]
for _, s := range ils.Spans {
// if not already encountered, then keep
token := tokenForID(h, buffer, int32(s.Kind), s.SpanId)
_, ok := c.spans[token]
if !ok {
notFoundSpans = append(notFoundSpans, spanB)
notFoundSpans = append(notFoundSpans, s)

// If last expected input, then we don't need to record
// the visited spans. Optimization has significant savings.
if !final {
c.spans[token] = struct{}{}
}
}
}

if len(notFoundSpans) > 0 {
spanCountTotal += len(notFoundSpans)
ilsB.Spans = notFoundSpans
notFoundILS = append(notFoundILS, ilsB)
ils.Spans = notFoundSpans
spanCount += len(notFoundSpans)
notFoundILS = append(notFoundILS, ils)
}
}

// if there were some spans not found in A, add everything left in the batch
if len(notFoundILS) > 0 {
batchB.InstrumentationLibrarySpans = notFoundILS
traceA.Batches = append(traceA.Batches, batchB)
b.InstrumentationLibrarySpans = notFoundILS
c.result.Batches = append(c.result.Batches, b)
}
}

SortTrace(traceA)

return traceA, spanCountTotal
c.combined = true
return
}

// tokenForID returns a uint32 token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 {
binary.LittleEndian.PutUint32(buffer, uint32(kind))
// Result returns the final trace and span count.
func (c *Combiner) Result() (*tempopb.Trace, int) {
spanCount := -1

h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return h.Sum32()
if c.result != nil && c.combined {
// Only if anything combined
SortTrace(c.result)
spanCount = len(c.spans)
}

return c.result, spanCount
}
Loading

0 comments on commit 7d26ee2

Please sign in to comment.