Skip to content

Commit

Permalink
Add convenience interface in consumertest that implements all consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Apr 1, 2021
1 parent 4dcd2b8 commit 9a30c1f
Show file tree
Hide file tree
Showing 40 changed files with 187 additions and 138 deletions.
9 changes: 2 additions & 7 deletions component/componenttest/nop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)

Expand Down Expand Up @@ -75,15 +74,11 @@ func (f *nopExporterFactory) CreateLogsExporter(

var nopExporterInstance = &nopExporter{
Component: componenthelper.New(),
Traces: consumertest.NewTracesNop(),
Metrics: consumertest.NewMetricsNop(),
Logs: consumertest.NewLogsNop(),
Consumer: consumertest.NewNop(),
}

// nopExporter stores consumed traces and metrics for testing purposes.
type nopExporter struct {
component.Component
consumer.Traces
consumer.Metrics
consumer.Logs
consumertest.Consumer
}
8 changes: 2 additions & 6 deletions component/componenttest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,13 @@ func (f *nopProcessorFactory) CreateLogsProcessor(

var nopProcessorInstance = &nopProcessor{
Component: componenthelper.New(),
Traces: consumertest.NewTracesNop(),
Metrics: consumertest.NewMetricsNop(),
Logs: consumertest.NewLogsNop(),
Consumer: consumertest.NewNop(),
}

// nopProcessor stores consumed traces and metrics for testing purposes.
type nopProcessor struct {
component.Component
consumer.Traces
consumer.Metrics
consumer.Logs
consumertest.Consumer
}

func (*nopProcessor) GetCapabilities() component.ProcessorCapabilities {
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ func TestNewNopProcessorFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &config.ProcessorSettings{TypeVal: factory.Type()}, cfg)

traces, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
traces, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, traces.GetCapabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewMetricsNop())
metrics, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, metrics.GetCapabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
logs, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, logs.GetCapabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/nop_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ func TestNewNopReceiverFactory(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &config.ReceiverSettings{TypeVal: factory.Type()}, cfg)

traces, err := factory.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewTracesNop())
traces, err := factory.CreateTracesReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.Shutdown(context.Background()))

metrics, err := factory.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewMetricsNop())
metrics, err := factory.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.Shutdown(context.Background()))

logs, err := factory.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewLogsNop())
logs, err := factory.CreateLogsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.Shutdown(context.Background()))
Expand Down
17 changes: 17 additions & 0 deletions consumer/consumertest/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package consumertest

import (
"go.opentelemetry.io/collector/consumer"
)

// Consumer is a convenience interface that implements all consumer interfaces.
// It has a private function on it to forbid external users to implement it,
// to allow us to add extra functions without breaking compatibility because
// nobody else implements this interface.
type Consumer interface {
consumer.Logs
consumer.Metrics
consumer.Traces

unexported()
}
12 changes: 12 additions & 0 deletions consumer/consumertest/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type errConsumer struct {
err error
}

func (er *errConsumer) unexported() {
panic("must not be called")
}

func (er *errConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
return er.err
}
Expand All @@ -37,17 +41,25 @@ func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
return er.err
}

// NewErr returns a Consumer that just drops all received data and returns no error.
func NewErr(err error) Consumer {
return &errConsumer{err: err}
}

// NewTracesErr returns a consumer.Traces that just drops all received data and returns the given error.
// Deprecated: Use NewErr().
func NewTracesErr(err error) consumer.Traces {
return &errConsumer{err: err}
}

// NewMetricsErr returns a consumer.Metrics that just drops all received data and returns the given error.
// Deprecated: Use NewErr().
func NewMetricsErr(err error) consumer.Metrics {
return &errConsumer{err: err}
}

// NewLogsErr returns a consumer.Logs that just drops all received data and returns the given error.
// Deprecated: Use NewErr().
func NewLogsErr(err error) consumer.Logs {
return &errConsumer{err: err}
}
9 changes: 9 additions & 0 deletions consumer/consumertest/err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestErr(t *testing.T) {
err := errors.New("my error")
ec := NewErr(err)
require.NotNil(t, ec)
assert.Equal(t, err, ec.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.Equal(t, err, ec.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.Equal(t, err, ec.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

func TestTracesErr(t *testing.T) {
err := errors.New("my error")
nt := NewTracesErr(err)
Expand Down
12 changes: 12 additions & 0 deletions consumer/consumertest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var (

type nopConsumer struct{}

func (nc *nopConsumer) unexported() {
panic("must not be called")
}

func (nc *nopConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
return nil
}
Expand All @@ -39,17 +43,25 @@ func (nc *nopConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
return nil
}

// NewNop returns a Consumer that just drops all received data and returns no error.
func NewNop() Consumer {
return nopInstance
}

// NewTracesNop returns a consumer.Traces that just drops all received data and returns no error.
// Deprecated: Use NewNop().
func NewTracesNop() consumer.Traces {
return nopInstance
}

// NewMetricsNop returns a consumer.Metrics that just drops all received data and returns no error.
// Deprecated: Use NewNop().
func NewMetricsNop() consumer.Metrics {
return nopInstance
}

// NewLogsNop returns a consumer.Logs that just drops all received data and returns no error.
// Deprecated: Use NewNop().
func NewLogsNop() consumer.Logs {
return nopInstance
}
8 changes: 8 additions & 0 deletions consumer/consumertest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestNop(t *testing.T) {
nc := NewNop()
require.NotNil(t, nc)
assert.NoError(t, nc.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, nc.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, nc.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

func TestTracesNop(t *testing.T) {
nt := NewTracesNop()
require.NotNil(t, nt)
Expand Down
6 changes: 3 additions & 3 deletions consumer/fanoutconsumer/cloningconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestTraceProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewTracesNop()
nop := consumertest.NewNop()
tfc := NewTracesCloning([]consumer.Traces{nop})
assert.Same(t, nop, tfc)
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) {
}

func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewMetricsNop()
nop := consumertest.NewNop()
mfc := NewMetrics([]consumer.Metrics{nop})
assert.Same(t, nop, mfc)
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) {
}

func TestLogsProcessorCloningNotMultiplexing(t *testing.T) {
nop := consumertest.NewLogsNop()
nop := consumertest.NewNop()
lfc := NewLogsCloning([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}
Expand Down
12 changes: 6 additions & 6 deletions consumer/fanoutconsumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestTracesProcessorNotMultiplexing(t *testing.T) {
nop := consumertest.NewTracesNop()
nop := consumertest.NewNop()
tfc := NewTraces([]consumer.Traces{nop})
assert.Same(t, nop, tfc)
}
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1] = consumertest.NewTracesErr(errors.New("my error"))
processors[1] = consumertest.NewErr(errors.New("my error"))

tfc := NewTraces(processors)
td := testdata.GenerateTraceDataOneSpan()
Expand All @@ -81,7 +81,7 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
}

func TestMetricsProcessorNotMultiplexing(t *testing.T) {
nop := consumertest.NewMetricsNop()
nop := consumertest.NewNop()
mfc := NewMetrics([]consumer.Metrics{nop})
assert.Same(t, nop, mfc)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1] = consumertest.NewMetricsErr(errors.New("my error"))
processors[1] = consumertest.NewErr(errors.New("my error"))

mfc := NewMetrics(processors)
md := testdata.GenerateMetricsOneMetric()
Expand All @@ -135,7 +135,7 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
}

func TestLogsProcessorNotMultiplexing(t *testing.T) {
nop := consumertest.NewLogsNop()
nop := consumertest.NewNop()
lfc := NewLogs([]consumer.Logs{nop})
assert.Same(t, nop, lfc)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestLogsProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1] = consumertest.NewLogsErr(errors.New("my error"))
processors[1] = consumertest.NewErr(errors.New("my error"))

lfc := NewLogs(processors)
ld := testdata.GenerateLogDataOneLog()
Expand Down
6 changes: 3 additions & 3 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestTraceInvalidUrl(t *testing.T) {
func TestTraceError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

startTraceReceiver(t, addr, consumertest.NewTracesErr(errors.New("my_error")))
startTraceReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startTraceExporter(t, "", fmt.Sprintf("http://%s/v1/traces", addr))

td := testdata.GenerateTraceDataOneSpan()
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestCompressionOptions(t *testing.T) {
func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

startMetricsReceiver(t, addr, consumertest.NewMetricsErr(errors.New("my_error")))
startMetricsReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startMetricsExporter(t, "", fmt.Sprintf("http://%s/v1/metrics", addr))

md := testdata.GenerateMetricsOneMetric()
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestMetricsRoundTrip(t *testing.T) {
func TestLogsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

startLogsReceiver(t, addr, consumertest.NewLogsErr(errors.New("my_error")))
startLogsReceiver(t, addr, consumertest.NewErr(errors.New("my_error")))
exp := startLogsExporter(t, "", fmt.Sprintf("http://%s/v1/logs", addr))

md := testdata.GenerateLogDataOneLog()
Expand Down
12 changes: 6 additions & 6 deletions processor/attributesprocessor/attributes_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestLogProcessor_NilEmptyData(t *testing.T) {
}

tp, err := factory.CreateLogsProcessor(
context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewLogsNop())
context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)
for i := range testCases {
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestAttributes_FilterLogs(t *testing.T) {
},
Config: *createConfig(filterset.Strict),
}
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

Expand Down Expand Up @@ -237,7 +237,7 @@ func TestAttributes_FilterLogsByNameStrict(t *testing.T) {
LogNames: []string{"dont_apply"},
Config: *createConfig(filterset.Strict),
}
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

Expand Down Expand Up @@ -300,7 +300,7 @@ func TestAttributes_FilterLogsByNameRegexp(t *testing.T) {
LogNames: []string{".*dont_apply$"},
Config: *createConfig(filterset.Regexp),
}
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

Expand Down Expand Up @@ -359,7 +359,7 @@ func TestLogAttributes_Hash(t *testing.T) {
{Key: "user.authenticated", Action: processorhelper.HASH},
}

tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.Nil(t, err)
require.NotNil(t, tp)

Expand Down Expand Up @@ -403,7 +403,7 @@ func BenchmarkAttributes_FilterLogsByName(b *testing.B) {
oCfg.Include = &filterconfig.MatchProperties{
LogNames: []string{"^apply.*"},
}
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewLogsNop())
tp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.Nil(b, err)
require.NotNil(b, tp)

Expand Down
Loading

0 comments on commit 9a30c1f

Please sign in to comment.