Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename marshall to marshal in all the occurrences #2977

Merged
merged 3 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

## 🛑 Breaking changes 🛑

- Change `With*Unmarshallers` signatures in Kafka exporter/receiver (#2973)
- Rename `marshall` to `marshal` in all the occurrences (#2977)

## 💡 Enhancements 💡

## 🧰 Bug fixes 🧰
Expand Down
26 changes: 13 additions & 13 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ const (
// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaExporterFactory)

// WithTracesMarshallers adds tracesMarshallers.
func WithTracesMarshallers(tracesMarshallers ...TracesMarshaller) FactoryOption {
// WithTracesMarshalers adds tracesMarshalers.
func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaller := range tracesMarshallers {
factory.tracesMarshallers[marshaller.Encoding()] = marshaller
for _, marshaler := range tracesMarshalers {
factory.tracesMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
tracesMarshallers: tracesMarshallers(),
metricsMarshallers: metricsMarshallers(),
logsMarshallers: logsMarshallers(),
tracesMarshalers: tracesMarshalers(),
metricsMarshalers: metricsMarshalers(),
logsMarshalers: logsMarshalers(),
}
for _, o := range options {
o(f)
Expand Down Expand Up @@ -90,9 +90,9 @@ func createDefaultConfig() config.Exporter {
}

type kafkaExporterFactory struct {
tracesMarshallers map[string]TracesMarshaller
metricsMarshallers map[string]MetricsMarshaller
logsMarshallers map[string]LogsMarshaller
tracesMarshalers map[string]TracesMarshaler
metricsMarshalers map[string]MetricsMarshaler
logsMarshalers map[string]LogsMarshaler
}

func (f *kafkaExporterFactory) createTracesExporter(
Expand All @@ -104,7 +104,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
exp, err := newTracesExporter(*oCfg, params, f.tracesMarshallers)
exp, err := newTracesExporter(*oCfg, params, f.tracesMarshalers)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +129,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
exp, err := newMetricsExporter(*oCfg, params, f.metricsMarshallers)
exp, err := newMetricsExporter(*oCfg, params, f.metricsMarshalers)
if err != nil {
return nil, err
}
Expand All @@ -154,7 +154,7 @@ func (f *kafkaExporterFactory) createLogsExporter(
if oCfg.Topic == "" {
oCfg.Topic = defaultLogsTopic
}
exp, err := newLogsExporter(*oCfg, params, f.logsMarshallers)
exp, err := newLogsExporter(*oCfg, params, f.logsMarshalers)
if err != nil {
return nil, err
}
Expand Down
28 changes: 14 additions & 14 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCreateTracesExporter(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
f := kafkaExporterFactory{tracesMarshallers: tracesMarshallers()}
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
Expand All @@ -53,7 +53,7 @@ func TestCreateMetricsExport(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{metricsMarshallers: metricsMarshallers()}
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
Expand All @@ -65,7 +65,7 @@ func TestCreateLogsExport(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{logsMarshallers: logsMarshallers()}
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
Expand All @@ -75,7 +75,7 @@ func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaExporterFactory{tracesMarshallers: tracesMarshallers()}
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
r, err := f.createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
// no available broker
require.Error(t, err)
Expand All @@ -86,7 +86,7 @@ func TestCreateMetricsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{metricsMarshallers: metricsMarshallers()}
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
mr, err := mf.createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.Error(t, err)
assert.Nil(t, mr)
Expand All @@ -96,15 +96,15 @@ func TestCreateLogsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{logsMarshallers: logsMarshallers()}
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
mr, err := mf.createLogsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.Error(t, err)
assert.Nil(t, mr)
}

func TestWithMarshallers(t *testing.T) {
cm := &customMarshaller{}
f := NewFactory(WithTracesMarshallers(cm))
func TestWithMarshalers(t *testing.T) {
cm := &customMarshaler{}
f := NewFactory(WithTracesMarshalers(cm))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
Expand All @@ -116,22 +116,22 @@ func TestWithMarshallers(t *testing.T) {
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpTracesPbMarshaller).Encoding()
cfg.Encoding = new(otlpTracesPbMarshaler).Encoding()
exporter, err := f.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

type customMarshaller struct {
type customMarshaler struct {
}

var _ TracesMarshaller = (*customMarshaller)(nil)
var _ TracesMarshaler = (*customMarshaler)(nil)

func (c customMarshaller) Marshal(_ pdata.Traces) ([]Message, error) {
func (c customMarshaler) Marshal(_ pdata.Traces) ([]Message, error) {
panic("implement me")
}

func (c customMarshaller) Encoding() string {
func (c customMarshaler) Encoding() string {
return "custom"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

type jaegerMarshaller struct {
marshaller jaegerSpanMarshaller
type jaegerMarshaler struct {
marshaler jaegerSpanMarshaler
}

var _ TracesMarshaller = (*jaegerMarshaller)(nil)
var _ TracesMarshaler = (*jaegerMarshaler)(nil)

func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
func (j jaegerMarshaler) Marshal(traces pdata.Traces) ([]Message, error) {
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
if err != nil {
return nil, err
Expand All @@ -41,7 +41,7 @@ func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
bts, err := j.marshaller.marshall(span)
bts, err := j.marshaler.marshal(span)
// continue to process spans that can be serialized
if err != nil {
errs = append(errs, err)
Expand All @@ -54,46 +54,46 @@ func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
return messages, consumererror.Combine(errs)
}

func (j jaegerMarshaller) Encoding() string {
return j.marshaller.encoding()
func (j jaegerMarshaler) Encoding() string {
return j.marshaler.encoding()
}

type jaegerSpanMarshaller interface {
marshall(span *jaegerproto.Span) ([]byte, error)
type jaegerSpanMarshaler interface {
marshal(span *jaegerproto.Span) ([]byte, error)
encoding() string
}

type jaegerProtoSpanMarshaller struct {
type jaegerProtoSpanMarshaler struct {
}

var _ jaegerSpanMarshaller = (*jaegerProtoSpanMarshaller)(nil)
var _ jaegerSpanMarshaler = (*jaegerProtoSpanMarshaler)(nil)

func (p jaegerProtoSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
func (p jaegerProtoSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error) {
return span.Marshal()
}

func (p jaegerProtoSpanMarshaller) encoding() string {
func (p jaegerProtoSpanMarshaler) encoding() string {
return "jaeger_proto"
}

type jaegerJSONSpanMarshaller struct {
pbMarshaller *jsonpb.Marshaler
type jaegerJSONSpanMarshaler struct {
pbMarshaler *jsonpb.Marshaler
}

var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil)
var _ jaegerSpanMarshaler = (*jaegerJSONSpanMarshaler)(nil)

func newJaegerJSONMarshaller() *jaegerJSONSpanMarshaller {
return &jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
func newJaegerJSONMarshaler() *jaegerJSONSpanMarshaler {
return &jaegerJSONSpanMarshaler{
pbMarshaler: &jsonpb.Marshaler{},
}
}

func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
func (p jaegerJSONSpanMarshaler) marshal(span *jaegerproto.Span) ([]byte, error) {
out := new(bytes.Buffer)
err := p.pbMarshaller.Marshal(out, span)
err := p.pbMarshaler.Marshal(out, span)
return out.Bytes(), err
}

func (p jaegerJSONSpanMarshaller) encoding() string {
func (p jaegerJSONSpanMarshaler) encoding() string {
return "jaeger_json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func TestJaegerMarshaller(t *testing.T) {
func TestJaegerMarshaler(t *testing.T) {
td := pdata.NewTraces()
span := td.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("foo")
Expand All @@ -43,26 +43,26 @@ func TestJaegerMarshaller(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, jaegerProtoBytes)

jsonMarshaller := &jsonpb.Marshaler{}
jsonMarshaler := &jsonpb.Marshaler{}
jsonByteBuffer := new(bytes.Buffer)
require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0]))
require.NoError(t, jsonMarshaler.Marshal(jsonByteBuffer, batches[0].Spans[0]))

tests := []struct {
unmarshaller TracesMarshaller
encoding string
messages []Message
unmarshaler TracesMarshaler
encoding string
messages []Message
}{
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
unmarshaler: jaegerMarshaler{
marshaler: jaegerProtoSpanMarshaler{},
},
encoding: "jaeger_proto",
messages: []Message{{Value: jaegerProtoBytes, Key: messageKey}},
},
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
unmarshaler: jaegerMarshaler{
marshaler: jaegerJSONSpanMarshaler{
pbMarshaler: &jsonpb.Marshaler{},
},
},
encoding: "jaeger_json",
Expand All @@ -71,22 +71,22 @@ func TestJaegerMarshaller(t *testing.T) {
}
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
messages, err := test.unmarshaller.Marshal(td)
messages, err := test.unmarshaler.Marshal(td)
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.encoding, test.unmarshaller.Encoding())
assert.Equal(t, test.encoding, test.unmarshaler.Encoding())
})
}
}

func TestJaegerMarshaller_error_covert_traceID(t *testing.T) {
marshaller := jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
func TestJaegerMarshaler_error_covert_traceID(t *testing.T) {
marshaler := jaegerMarshaler{
marshaler: jaegerProtoSpanMarshaler{},
}
td := pdata.NewTraces()
td.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
// fails in zero traceID
messages, err := marshaller.Marshal(td)
messages, err := marshaler.Marshal(td)
require.Error(t, err)
assert.Nil(t, messages)
}
Loading