Skip to content

Commit

Permalink
Wrap the error used in concurrentbatchprocessor (#235)
Browse files Browse the repository at this point in the history
A missing `func (_) Unwrap() error` makes the receiver not able to
unwrap the exporter errors.

Fixes #234.
  • Loading branch information
jmacd authored Jul 17, 2024
1 parent 410f79a commit f09bdeb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Wrap concurrentbatchprocessor errors [#235](https://github.com/open-telemetry/otel-arrow/pull/235)
- Update to OTel-Collector v0.105.0, which includes the OTel-Arrow components. [#233](https://github.com/open-telemetry/otel-arrow/pull/233)
- Remove the primary exporter/receiver components, update references and documentation. [#230](https://github.com/open-telemetry/otel-arrow/pull/230)
- Update to Arrow v17. [#231](https://github.com/open-telemetry/otel-arrow/pull/231)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (ce countedError) Error() string {
return fmt.Sprintf("batch error: %s", ce.err.Error())
}

func (ce countedError) Unwrap() error {
return ce.err
}

var _ consumer.Traces = (*batchProcessor)(nil)
var _ consumer.Metrics = (*batchProcessor)(nil)
var _ consumer.Logs = (*batchProcessor)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ import (
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

type testError struct{}

func (testError) Error() string {
return "test"
}

func TestErrorWrapping(t *testing.T) {
e := countedError{
err: fmt.Errorf("oops: %w", testError{}),
}
require.Error(t, e)
require.Contains(t, e.Error(), "oops: test")
require.ErrorIs(t, e, testError{})
}

func TestProcessorShutdown(t *testing.T) {
factory := NewFactory()

Expand Down Expand Up @@ -1697,3 +1712,40 @@ func TestBatchProcessorEmptyBatch(t *testing.T) {
wg.Wait()
require.NoError(t, batcher.Shutdown(context.Background()))
}

type errorSink struct {
err error
}

var _ consumer.Logs = errorSink{}

func (es errorSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (es errorSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return es.err
}

func TestErrorPropagation(t *testing.T) {
for _, proto := range []error{
testError{},
fmt.Errorf("womp"),
} {
sink := errorSink{err: proto}

creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
cfg := createDefaultConfig().(*Config)
batcher, err := newBatchLogsProcessor(creationSet, sink, cfg)

require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

ld := testdata.GenerateLogs(1)
err = batcher.ConsumeLogs(context.Background(), ld)
assert.Error(t, err)
assert.ErrorIs(t, err, proto)
assert.Contains(t, err.Error(), proto.Error())
}
}

0 comments on commit f09bdeb

Please sign in to comment.