Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Zipkin: CombineTraces #688

Merged
merged 2 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [ENHANCEMENT] Improve WAL Replay by not rebuilding the WAL. [#668](https://github.com/grafana/tempo/pull/668)
* [ENHANCEMENT] Add config option to disable write extension to the ingesters. [#677](https://github.com/grafana/tempo/pull/677)
* [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679)
* [ENHANCEMENT] Zipkin Support - CombineTraces. [#688](https://github.com/grafana/tempo/pull/688)

## v0.7.0

Expand Down
15 changes: 12 additions & 3 deletions pkg/util/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"bytes"
"encoding/binary"
"hash"
"hash/fnv"
"sort"
Expand Down Expand Up @@ -68,12 +69,13 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int
spanCountTotal := 0

h := fnv.New32()
buffer := make([]byte, 4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

32 / 8 = 4?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol i meant i didn't think we need more than 1 byte


spansInA := make(map[uint32]struct{})
for _, batchA := range traceA.Batches {
for _, ilsA := range batchA.InstrumentationLibrarySpans {
for _, spanA := range ilsA.Spans {
spansInA[tokenForID(h, spanA.SpanId)] = struct{}{}
spansInA[tokenForID(h, buffer, int32(spanA.Kind), spanA.SpanId)] = struct{}{}
}
spanCountA += len(ilsA.Spans)
spanCountTotal += len(ilsA.Spans)
Expand All @@ -88,7 +90,7 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int
notFoundSpans := ilsB.Spans[:0]
for _, spanB := range ilsB.Spans {
// if found in A, remove from the batch
_, ok := spansInA[tokenForID(h, spanB.SpanId)]
_, ok := spansInA[tokenForID(h, buffer, int32(spanB.Kind), spanB.SpanId)]
if !ok {
notFoundSpans = append(notFoundSpans, spanB)
}
Expand Down Expand Up @@ -155,8 +157,15 @@ func compareSpans(a *v1.Span, b *v1.Span) bool {
return a.StartTimeUnixNano < b.StartTimeUnixNano
}

func tokenForID(h hash.Hash32, b []byte) uint32 {
// tokenForID returns a uint32 token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 {
binary.LittleEndian.PutUint32(buffer, uint32(kind))

h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return h.Sum32()
}
12 changes: 12 additions & 0 deletions pkg/util/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"bytes"
"fmt"
"hash/fnv"
"math/rand"
"testing"

Expand Down Expand Up @@ -274,3 +275,14 @@ func TestSortTrace(t *testing.T) {
assert.Equal(t, tt.expected, tt.input)
}
}

func BenchmarkTokenForID(b *testing.B) {
h := fnv.New32()
id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
buffer := make([]byte, 4)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = tokenForID(h, buffer, 0, id)
}
}