Skip to content

Commit

Permalink
Merge pull request #15 from opentracing/interface_carriers
Browse files Browse the repository at this point in the history
Move to interface-based carriers
  • Loading branch information
bensigelman committed Mar 21, 2016
2 parents ee13d47 + 2bd87dd commit 291bfac
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 136 deletions.
179 changes: 57 additions & 122 deletions propagation_ot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ package basictracer

import (
"bytes"
"encoding/base64"
"encoding/binary"
"io"
"net/http"
"strconv"
"strings"
"time"

opentracing "github.com/opentracing/opentracing-go"
)

type splitTextPropagator struct {
type textMapPropagator struct {
tracer *tracerImpl
}
type splitBinaryPropagator struct {
type binaryPropagator struct {
tracer *tracerImpl
}
type goHTTPPropagator struct {
*splitBinaryPropagator
*textMapPropagator
}

const (
Expand All @@ -33,82 +31,76 @@ const (
fieldNameSampled = prefixTracerState + "sampled"
)

func (p *splitTextPropagator) Inject(
func (p *textMapPropagator) Inject(
sp opentracing.Span,
carrier interface{},
opaqueCarrier interface{},
) error {
sc, ok := sp.(*spanImpl)
if !ok {
return opentracing.ErrInvalidSpan
}
splitTextCarrier, ok := carrier.(*opentracing.SplitTextCarrier)
carrier, ok := opaqueCarrier.(opentracing.TextMapWriter)
if !ok {
return opentracing.ErrInvalidCarrier
}
if splitTextCarrier.TracerState == nil {
splitTextCarrier.TracerState = make(map[string]string, tracerStateFieldCount)
}
splitTextCarrier.TracerState[fieldNameTraceID] = strconv.FormatInt(sc.raw.TraceID, 16)
splitTextCarrier.TracerState[fieldNameSpanID] = strconv.FormatInt(sc.raw.SpanID, 16)
splitTextCarrier.TracerState[fieldNameSampled] = strconv.FormatBool(sc.raw.Sampled)
carrier.Set(fieldNameTraceID, strconv.FormatInt(sc.raw.TraceID, 16))
carrier.Set(fieldNameSpanID, strconv.FormatInt(sc.raw.SpanID, 16))
carrier.Set(fieldNameSampled, strconv.FormatBool(sc.raw.Sampled))

sc.Lock()
if l := len(sc.raw.Baggage); l > 0 && splitTextCarrier.Baggage == nil {
splitTextCarrier.Baggage = make(map[string]string, l)
}
for k, v := range sc.raw.Baggage {
splitTextCarrier.Baggage[prefixBaggage+k] = v
carrier.Set(prefixBaggage+k, v)
}
sc.Unlock()
return nil
}

func (p *splitTextPropagator) Join(
func (p *textMapPropagator) Join(
operationName string,
carrier interface{},
opaqueCarrier interface{},
) (opentracing.Span, error) {
splitTextCarrier, ok := carrier.(*opentracing.SplitTextCarrier)
carrier, ok := opaqueCarrier.(opentracing.TextMapReader)
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
requiredFieldCount := 0
var traceID, propagatedSpanID int64
var sampled bool
var err error
for k, v := range splitTextCarrier.TracerState {
decodedBaggage := make(map[string]string)
err = carrier.ForeachKey(func(k, v string) error {
switch strings.ToLower(k) {
case fieldNameTraceID:
traceID, err = strconv.ParseInt(v, 16, 64)
if err != nil {
return nil, opentracing.ErrTraceCorrupted
return opentracing.ErrTraceCorrupted
}
case fieldNameSpanID:
propagatedSpanID, err = strconv.ParseInt(v, 16, 64)
if err != nil {
return nil, opentracing.ErrTraceCorrupted
return opentracing.ErrTraceCorrupted
}
case fieldNameSampled:
sampled, err = strconv.ParseBool(v)
if err != nil {
return nil, opentracing.ErrTraceCorrupted
return opentracing.ErrTraceCorrupted
}
default:
continue
}
requiredFieldCount++
}
var decodedBaggage map[string]string
if splitTextCarrier.Baggage != nil {
decodedBaggage = make(map[string]string)
for k, v := range splitTextCarrier.Baggage {
lowercaseK := strings.ToLower(k)
if strings.HasPrefix(lowercaseK, prefixBaggage) {
decodedBaggage[strings.TrimPrefix(lowercaseK, prefixBaggage)] = v
}
// Balance off the requiredFieldCount++ just below...
requiredFieldCount--
}
requiredFieldCount++
return nil
})
if err != nil {
return nil, err
}
if requiredFieldCount < tracerStateFieldCount {
if len(splitTextCarrier.TracerState) == 0 {
if requiredFieldCount == 0 {
return nil, opentracing.ErrTraceNotFound
}
return nil, opentracing.ErrTraceCorrupted
Expand All @@ -133,15 +125,15 @@ func (p *splitTextPropagator) Join(
), nil
}

func (p *splitBinaryPropagator) Inject(
func (p *binaryPropagator) Inject(
sp opentracing.Span,
carrier interface{},
opaqueCarrier interface{},
) error {
sc, ok := sp.(*spanImpl)
if !ok {
return opentracing.ErrInvalidSpan
}
splitBinaryCarrier, ok := carrier.(*opentracing.SplitBinaryCarrier)
carrier, ok := opaqueCarrier.(io.Writer)
if !ok {
return opentracing.ErrInvalidCarrier
}
Expand All @@ -152,74 +144,73 @@ func (p *splitBinaryPropagator) Inject(
}

// Handle the trace and span ids, and sampled status.
contextBuf := bytes.NewBuffer(splitBinaryCarrier.TracerState[:0])
err = binary.Write(contextBuf, binary.BigEndian, sc.raw.TraceID)
err = binary.Write(carrier, binary.BigEndian, sc.raw.TraceID)
if err != nil {
return err
}

err = binary.Write(contextBuf, binary.BigEndian, sc.raw.SpanID)
err = binary.Write(carrier, binary.BigEndian, sc.raw.SpanID)
if err != nil {
return err
}

err = binary.Write(contextBuf, binary.BigEndian, sampledByte)
err = binary.Write(carrier, binary.BigEndian, sampledByte)
if err != nil {
return err
}

// Handle the baggage.
baggageBuf := bytes.NewBuffer(splitBinaryCarrier.Baggage[:0])
err = binary.Write(baggageBuf, binary.BigEndian, int32(len(sc.raw.Baggage)))
err = binary.Write(carrier, binary.BigEndian, int32(len(sc.raw.Baggage)))
if err != nil {
return err
}
for k, v := range sc.raw.Baggage {
if err = binary.Write(baggageBuf, binary.BigEndian, int32(len(k))); err != nil {
for key, val := range sc.raw.Baggage {
if err = binary.Write(carrier, binary.BigEndian, int32(len(key))); err != nil {
return err
}
baggageBuf.WriteString(k)
if err = binary.Write(baggageBuf, binary.BigEndian, int32(len(v))); err != nil {
if _, err = io.WriteString(carrier, key); err != nil {
return err
}

if err = binary.Write(carrier, binary.BigEndian, int32(len(val))); err != nil {
return err
}
if _, err = io.WriteString(carrier, val); err != nil {
return err
}
baggageBuf.WriteString(v)
}

splitBinaryCarrier.TracerState = contextBuf.Bytes()
splitBinaryCarrier.Baggage = baggageBuf.Bytes()
return nil
}

func (p *splitBinaryPropagator) Join(
func (p *binaryPropagator) Join(
operationName string,
carrier interface{},
opaqueCarrier interface{},
) (opentracing.Span, error) {
splitBinaryCarrier, ok := carrier.(*opentracing.SplitBinaryCarrier)
carrier, ok := opaqueCarrier.(io.Reader)
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
if len(splitBinaryCarrier.TracerState) == 0 {
return nil, opentracing.ErrTraceNotFound
}
// Handle the trace, span ids, and sampled status.
contextReader := bytes.NewReader(splitBinaryCarrier.TracerState)
var traceID, propagatedSpanID int64
var sampledByte byte

if err := binary.Read(contextReader, binary.BigEndian, &traceID); err != nil {
if err := binary.Read(carrier, binary.BigEndian, &traceID); err != nil {
if err == io.EOF {
return nil, opentracing.ErrTraceNotFound
}
return nil, opentracing.ErrTraceCorrupted
}
if err := binary.Read(contextReader, binary.BigEndian, &propagatedSpanID); err != nil {
if err := binary.Read(carrier, binary.BigEndian, &propagatedSpanID); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
if err := binary.Read(contextReader, binary.BigEndian, &sampledByte); err != nil {
if err := binary.Read(carrier, binary.BigEndian, &sampledByte); err != nil {
return nil, opentracing.ErrTraceCorrupted
}

// Handle the baggage.
baggageReader := bytes.NewReader(splitBinaryCarrier.Baggage)
var numBaggage int32
if err := binary.Read(baggageReader, binary.BigEndian, &numBaggage); err != nil {
if err := binary.Read(carrier, binary.BigEndian, &numBaggage); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
iNumBaggage := int(numBaggage)
Expand All @@ -229,20 +220,20 @@ func (p *splitBinaryPropagator) Join(
baggageMap = make(map[string]string, iNumBaggage)
var keyLen, valLen int32
for i := 0; i < iNumBaggage; i++ {
if err := binary.Read(baggageReader, binary.BigEndian, &keyLen); err != nil {
if err := binary.Read(carrier, binary.BigEndian, &keyLen); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
buf.Grow(int(keyLen))
if n, err := io.CopyN(&buf, baggageReader, int64(keyLen)); err != nil || int32(n) != keyLen {
if n, err := io.CopyN(&buf, carrier, int64(keyLen)); err != nil || int32(n) != keyLen {
return nil, opentracing.ErrTraceCorrupted
}
key := buf.String()
buf.Reset()

if err := binary.Read(baggageReader, binary.BigEndian, &valLen); err != nil {
if err := binary.Read(carrier, binary.BigEndian, &valLen); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
if n, err := io.CopyN(&buf, baggageReader, int64(valLen)); err != nil || int32(n) != valLen {
if n, err := io.CopyN(&buf, carrier, int64(valLen)); err != nil || int32(n) != valLen {
return nil, opentracing.ErrTraceCorrupted
}
baggageMap[key] = buf.String()
Expand All @@ -268,59 +259,3 @@ func (p *splitBinaryPropagator) Join(
nil,
), nil
}

const (
tracerStateHeaderName = "Tracer-State"
traceBaggageHeaderName = "Trace-Baggage"
)

func (p *goHTTPPropagator) Inject(
sp opentracing.Span,
carrier interface{},
) error {
// Defer to SplitBinary for the real work.
splitBinaryCarrier := opentracing.NewSplitBinaryCarrier()
if err := p.splitBinaryPropagator.Inject(sp, splitBinaryCarrier); err != nil {
return err
}

// Encode into the HTTP header as two base64 strings.
header := carrier.(http.Header)
header.Add(tracerStateHeaderName, base64.StdEncoding.EncodeToString(
splitBinaryCarrier.TracerState))
header.Add(traceBaggageHeaderName, base64.StdEncoding.EncodeToString(
splitBinaryCarrier.Baggage))

return nil
}

func (p *goHTTPPropagator) Join(
operationName string,
carrier interface{},
) (opentracing.Span, error) {
// Decode the two base64-encoded data blobs from the HTTP header.
header := carrier.(http.Header)
tracerStateBase64, found := header[http.CanonicalHeaderKey(tracerStateHeaderName)]
if !found || len(tracerStateBase64) == 0 {
return nil, opentracing.ErrTraceNotFound
}
traceBaggageBase64, found := header[http.CanonicalHeaderKey(traceBaggageHeaderName)]
if !found || len(traceBaggageBase64) == 0 {
return nil, opentracing.ErrTraceNotFound
}
tracerStateBinary, err := base64.StdEncoding.DecodeString(tracerStateBase64[0])
if err != nil {
return nil, opentracing.ErrTraceCorrupted
}
traceBaggageBinary, err := base64.StdEncoding.DecodeString(traceBaggageBase64[0])
if err != nil {
return nil, opentracing.ErrTraceCorrupted
}

// Defer to SplitBinary for the real work.
splitBinaryCarrier := &opentracing.SplitBinaryCarrier{
TracerState: tracerStateBinary,
Baggage: traceBaggageBinary,
}
return p.splitBinaryPropagator.Join(operationName, splitBinaryCarrier)
}
22 changes: 8 additions & 14 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ func DefaultOptions() Options {
// NewWithOptions creates a customized Tracer.
func NewWithOptions(opts Options) opentracing.Tracer {
rval := &tracerImpl{Options: opts}
rval.textPropagator = &splitTextPropagator{rval}
rval.binaryPropagator = &splitBinaryPropagator{rval}
rval.goHTTPPropagator = &goHTTPPropagator{rval.binaryPropagator}
rval.textPropagator = &textMapPropagator{rval}
rval.binaryPropagator = &binaryPropagator{rval}
rval.accessorPropagator = &accessorPropagator{rval}
return rval
}
Expand All @@ -99,9 +98,8 @@ func New(recorder SpanRecorder) opentracing.Tracer {
// Implements the `Tracer` interface.
type tracerImpl struct {
Options
textPropagator *splitTextPropagator
binaryPropagator *splitBinaryPropagator
goHTTPPropagator *goHTTPPropagator
textPropagator *textMapPropagator
binaryPropagator *binaryPropagator
accessorPropagator *accessorPropagator
}

Expand Down Expand Up @@ -189,12 +187,10 @@ var Delegator delegatorType

func (t *tracerImpl) Inject(sp opentracing.Span, format interface{}, carrier interface{}) error {
switch format {
case opentracing.SplitText:
case opentracing.TextMap:
return t.textPropagator.Inject(sp, carrier)
case opentracing.SplitBinary:
case opentracing.Binary:
return t.binaryPropagator.Inject(sp, carrier)
case opentracing.GoHTTPHeader:
return t.goHTTPPropagator.Inject(sp, carrier)
}
if _, ok := format.(delegatorType); ok {
return t.accessorPropagator.Inject(sp, carrier)
Expand All @@ -204,12 +200,10 @@ func (t *tracerImpl) Inject(sp opentracing.Span, format interface{}, carrier int

func (t *tracerImpl) Join(operationName string, format interface{}, carrier interface{}) (opentracing.Span, error) {
switch format {
case opentracing.SplitText:
case opentracing.TextMap:
return t.textPropagator.Join(operationName, carrier)
case opentracing.SplitBinary:
case opentracing.Binary:
return t.binaryPropagator.Join(operationName, carrier)
case opentracing.GoHTTPHeader:
return t.goHTTPPropagator.Join(operationName, carrier)
}
if _, ok := format.(delegatorType); ok {
return t.accessorPropagator.Join(operationName, carrier)
Expand Down

0 comments on commit 291bfac

Please sign in to comment.