Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zipkinv1: move to translator and encoders interfaces #3419

Merged
merged 3 commits into from
Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 4 additions & 37 deletions receiver/kafkareceiver/zipkin_unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
package kafkareceiver

import (
"context"
"encoding/json"

"github.com/apache/thrift/lib/go/thrift"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"

"go.opentelemetry.io/collector/consumer/pdata"
zipkintranslator "go.opentelemetry.io/collector/translator/trace/zipkin"
"go.opentelemetry.io/collector/translator/trace/zipkinv1"
"go.opentelemetry.io/collector/translator/trace/zipkinv2"
)

var v1ThriftUnmarshaler = zipkinv1.NewThriftTracesUnmarshaler()

type zipkinProtoSpanUnmarshaler struct {
}

Expand Down Expand Up @@ -71,41 +70,9 @@ type zipkinThriftSpanUnmarshaler struct {
var _ TracesUnmarshaler = (*zipkinThriftSpanUnmarshaler)(nil)

func (z zipkinThriftSpanUnmarshaler) Unmarshal(bytes []byte) (pdata.Traces, error) {
spans, err := deserializeZipkinThrift(bytes)
if err != nil {
return pdata.NewTraces(), err
}
return zipkintranslator.V1ThriftBatchToInternalTraces(spans)

return v1ThriftUnmarshaler.Unmarshal(bytes)
}

func (z zipkinThriftSpanUnmarshaler) Encoding() string {
return "zipkin_thrift"
}

// deserializeThrift decodes Thrift bytes to a list of spans.
// This code comes from jaegertracing/jaeger, ideally we should have imported
// it but this was creating many conflicts so brought the code to here.
// https://github.com/jaegertracing/jaeger/blob/6bc0c122bfca8e737a747826ae60a22a306d7019/model/converter/thrift/zipkin/deserialize.go#L36
func deserializeZipkinThrift(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

transport := thrift.NewTBinaryProtocolConf(buffer, nil)
_, size, err := transport.ReadListBegin(context.Background()) // Ignore the returned element type
if err != nil {
return nil, err
}

// We don't depend on the size returned by ReadListBegin to preallocate the array because it
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
var spans []*zipkincore.Span
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(context.Background(), transport); err != nil {
return nil, err
}
spans = append(spans, zs)
}
return spans, nil
}
9 changes: 4 additions & 5 deletions receiver/kafkareceiver/zipkin_unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import (

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
zipkintranslator "go.opentelemetry.io/collector/translator/trace/zipkin"
"go.opentelemetry.io/collector/translator/trace/zipkinv2"
)

var fromTranslator zipkinv2.FromTranslator
var v2FromTranslator zipkinv2.FromTranslator

func TestUnmarshalZipkin(t *testing.T) {
td := pdata.NewTraces()
Expand All @@ -46,7 +45,7 @@ func TestUnmarshalZipkin(t *testing.T) {
span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.SetParentSpanID(pdata.NewSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 0}))
ret, err := fromTranslator.FromTraces(td)
ret, err := v2FromTranslator.FromTraces(td)
require.NoError(t, err)
spans := ret.([]*zipkinmodel.SpanModel)

Expand All @@ -62,7 +61,7 @@ func TestUnmarshalZipkin(t *testing.T) {
require.NoError(t, err)
require.NoError(t, protocolTransport.WriteListEnd(context.Background()))

tdThrift, err := zipkintranslator.V1ThriftBatchToInternalTraces([]*zipkincore.Span{tSpan})
tdThrift, err := v1ThriftUnmarshaler.Unmarshal(thriftTransport.Buffer.Bytes())
require.NoError(t, err)

protoBytes, err := new(zipkin_proto3.SpanSerializer).Serialize(spans)
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestUnmarshalZipkin(t *testing.T) {
func TestUnmarshalZipkinThrift_error(t *testing.T) {
p := zipkinThriftSpanUnmarshaler{}
got, err := p.Unmarshal([]byte("+$%"))
assert.Equal(t, pdata.NewTraces(), got)
assert.Equal(t, pdata.Traces{}, got)
assert.Error(t, err)
}

Expand Down
25 changes: 12 additions & 13 deletions receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"strings"
"sync"

jaegerzipkin "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/proto/zipkin_proto3"

Expand All @@ -39,7 +38,7 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/model"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/translator/trace/zipkin"
"go.opentelemetry.io/collector/translator/trace/zipkinv1"
"go.opentelemetry.io/collector/translator/trace/zipkinv2"
)

Expand All @@ -66,6 +65,9 @@ type ZipkinReceiver struct {
server *http.Server
config *Config
translator model.ToTracesTranslator

v1ThriftUnmarshaler model.TracesUnmarshaler
v1JSONUnmarshaler model.TracesUnmarshaler
}

var _ http.Handler = (*ZipkinReceiver)(nil)
Expand All @@ -77,10 +79,12 @@ func New(config *Config, nextConsumer consumer.Traces) (*ZipkinReceiver, error)
}

zr := &ZipkinReceiver{
nextConsumer: nextConsumer,
id: config.ID(),
config: config,
translator: zipkinv2.ToTranslator{ParseStringTags: config.ParseStringTags},
nextConsumer: nextConsumer,
id: config.ID(),
config: config,
translator: zipkinv2.ToTranslator{ParseStringTags: config.ParseStringTags},
v1ThriftUnmarshaler: zipkinv1.NewThriftTracesUnmarshaler(),
v1JSONUnmarshaler: zipkinv1.NewJSONTracesUnmarshaler(config.ParseStringTags),
}
return zr, nil
}
Expand Down Expand Up @@ -116,14 +120,9 @@ func (zr *ZipkinReceiver) Start(_ context.Context, host component.Host) error {
// v1ToTraceSpans parses Zipkin v1 JSON traces and converts them to OpenCensus Proto spans.
func (zr *ZipkinReceiver) v1ToTraceSpans(blob []byte, hdr http.Header) (reqs pdata.Traces, err error) {
if hdr.Get("Content-Type") == "application/x-thrift" {
zSpans, err := jaegerzipkin.DeserializeThrift(blob)
if err != nil {
return pdata.NewTraces(), err
}

return zipkin.V1ThriftBatchToInternalTraces(zSpans)
return zr.v1ThriftUnmarshaler.Unmarshal(blob)
}
return zipkin.V1JSONBatchToInternalTraces(blob, zr.config.ParseStringTags)
return zr.v1JSONUnmarshaler.Unmarshal(blob)
}

// v2ToTraceSpans parses Zipkin v2 JSON or Protobuf traces and converts them to OpenCensus Proto spans.
Expand Down
2 changes: 1 addition & 1 deletion receiver/zipkinreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (
const (
zipkinV2Single = "../../translator/trace/zipkinv2/testdata/zipkin_v2_single.json"
zipkinV2NoTimestamp = "../../translator/trace/zipkinv2/testdata/zipkin_v2_notimestamp.json"
zipkinV1SingleBatch = "../../translator/trace/zipkin/testdata/zipkin_v1_single_batch.json"
zipkinV1SingleBatch = "../../translator/trace/zipkinv1/testdata/zipkin_v1_single_batch.json"
)

var zipkinReceiverID = config.NewIDWithName(typeStr, "receiver_test")
Expand Down
34 changes: 0 additions & 34 deletions translator/trace/zipkin/zipkinv1_thrift_to_traces.go

This file was deleted.

39 changes: 0 additions & 39 deletions translator/trace/zipkin/zipkinv1_thrift_to_traces_test.go

This file was deleted.

42 changes: 0 additions & 42 deletions translator/trace/zipkin/zipkinv1_to_traces_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin
package zipkinv1

import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin
package zipkinv1

// https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186
const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin
package zipkinv1

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin
package zipkinv1

import (
"encoding/json"
Expand All @@ -28,8 +28,10 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"go.opentelemetry.io/collector/consumer/pdata"
idutils "go.opentelemetry.io/collector/internal/idutils"
"go.opentelemetry.io/collector/internal/idutils"
"go.opentelemetry.io/collector/internal/model"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
"go.opentelemetry.io/collector/translator/trace/zipkin"
)

var (
Expand All @@ -39,14 +41,30 @@ var (
msgZipkinV1SpanIDError = "zipkinV1 span id"
msgZipkinV1ParentIDError = "zipkinV1 span parentId"
// Generic hex to ID conversion errors
errHexTraceIDWrongLen = errors.New("hex traceId span has wrong length (expected 16 or 32)")
errHexTraceIDParsing = errors.New("failed to parse hex traceId")
errHexTraceIDZero = errors.New("traceId is zero")
errHexIDWrongLen = errors.New("hex Id has wrong length (expected 16)")
errHexIDParsing = errors.New("failed to parse hex Id")
errHexIDZero = errors.New("ID is zero")
errHexTraceIDWrongLen = errors.New("hex traceId span has wrong length (expected 16 or 32)")
errHexTraceIDParsing = errors.New("failed to parse hex traceId")
errHexTraceIDZero = errors.New("traceId is zero")
errHexIDWrongLen = errors.New("hex Id has wrong length (expected 16)")
errHexIDParsing = errors.New("failed to parse hex Id")
errHexIDZero = errors.New("ID is zero")
_ model.TracesDecoder = (*jsonDecoder)(nil)
)

type jsonDecoder struct {
// ParseStringTags should be set to true if tags should be converted to numbers when possible.
ParseStringTags bool
}

// DecodeTraces from JSON bytes.
func (j jsonDecoder) DecodeTraces(buf []byte) (interface{}, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Would be nice to stop using OC as intermediate representation, i.e. not related to this PR :) )

return v1JSONBatchToOCProto(buf, j.ParseStringTags)
}

// NewJSONTracesUnmarshaler returns an unmarshaler for Zipkin JSON.
func NewJSONTracesUnmarshaler(parseStringTags bool) model.TracesUnmarshaler {
return model.NewTracesUnmarshaler(jsonDecoder{ParseStringTags: parseStringTags}, toTranslator{})
}

// Trace translation from Zipkin V1 is a bit of special case since there is no model
// defined in golang for Zipkin V1 spans and there is no need to define one here, given
// that the zipkinV1Span defined below is as defined at:
Expand Down Expand Up @@ -254,7 +272,7 @@ func parseAnnotationValue(value string, parseStringTags bool) *tracepb.Attribute
pbAttrib := &tracepb.AttributeValue{}

if parseStringTags {
switch DetermineValueType(value) {
switch zipkin.DetermineValueType(value) {
case pdata.AttributeValueTypeInt:
iValue, _ := strconv.ParseInt(value, 10, 64)
pbAttrib.Value = &tracepb.AttributeValue_IntValue{IntValue: iValue}
Expand Down Expand Up @@ -509,7 +527,7 @@ func setTimestampsIfUnset(span *tracepb.Span) {
if span.Attributes.AttributeMap == nil {
span.Attributes.AttributeMap = make(map[string]*tracepb.AttributeValue, 1)
}
span.Attributes.AttributeMap[StartTimeAbsent] = &tracepb.AttributeValue{
span.Attributes.AttributeMap[zipkin.StartTimeAbsent] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_BoolValue{
BoolValue: true,
}}
Expand Down
Loading