Skip to content

Commit

Permalink
Refactor batch processor export method (open-telemetry#11309)
Browse files Browse the repository at this point in the history
#### Description

Split the internal `batcher.export()` interface into three methods. This
is a refactoring that was applied in
https://github.com/open-telemetry/otel-arrow/tree/main/collector/processor/concurrentbatchprocessor
and is being back-ported as part of open-telemetry#11308. The reason this refactoring
is needed is that the parent context of the export() request will be
manipulated in common code (vs signal-specific code) for tracing
support.

#### Link to tracking issue
Part of open-telemetry#11308

#### Testing
Existing tests cover this.

---------

Co-authored-by: Bogdan Drutu <[email protected]>
  • Loading branch information
2 people authored and jackgopack4 committed Oct 8, 2024
1 parent 1ef66a9 commit 3c09581
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
72 changes: 50 additions & 22 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,19 @@ type shard struct {
// batch is an interface generalizing the individual signal types.
type batch interface {
// export the current batch
export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error)
export(ctx context.Context, req any) error

// splitBatch returns a full request built from pending items.
splitBatch(ctx context.Context, sendBatchMaxSize int) (sentBatchSize int, req any)

// itemCount returns the size of the current batch
itemCount() int

// add item to the current batch
add(item any)

// sizeBytes counts the OTLP encoding size of the batch
sizeBytes(item any) int
}

var _ consumer.Traces = (*batchProcessor)(nil)
Expand Down Expand Up @@ -258,12 +264,18 @@ func (b *shard) resetTimer() {
}

func (b *shard) sendItems(trigger trigger) {
sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize)

err := b.batch.export(b.exportCtx, req)
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
return
}
var bytes int
if b.processor.telemetry.detailed {
bytes = b.batch.sizeBytes(req)
}
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
Expand Down Expand Up @@ -394,10 +406,18 @@ func (bt *batchTraces) add(item any) {
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bt *batchTraces) sizeBytes(data any) int {
return bt.sizer.TracesSize(data.(ptrace.Traces))
}

func (bt *batchTraces) export(ctx context.Context, req any) error {
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}

func (bt *batchTraces) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) {
var req ptrace.Traces
var sent int
var bytes int
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
req = splitTraces(sendBatchMaxSize, bt.traceData)
bt.spanCount -= sendBatchMaxSize
Expand All @@ -408,10 +428,7 @@ func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnB
bt.traceData = ptrace.NewTraces()
bt.spanCount = 0
}
if returnBytes {
bytes = bt.sizer.TracesSize(req)
}
return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req)
return sent, req
}

func (bt *batchTraces) itemCount() int {
Expand All @@ -429,10 +446,18 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}}
}

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bm *batchMetrics) sizeBytes(data any) int {
return bm.sizer.MetricsSize(data.(pmetric.Metrics))
}

func (bm *batchMetrics) export(ctx context.Context, req any) error {
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}

func (bm *batchMetrics) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) {
var req pmetric.Metrics
var sent int
var bytes int
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.dataPointCount -= sendBatchMaxSize
Expand All @@ -443,10 +468,8 @@ func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, return
bm.metricData = pmetric.NewMetrics()
bm.dataPointCount = 0
}
if returnBytes {
bytes = bm.sizer.MetricsSize(req)
}
return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req)

return sent, req
}

func (bm *batchMetrics) itemCount() int {
Expand Down Expand Up @@ -475,10 +498,18 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}}
}

func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
func (bl *batchLogs) sizeBytes(data any) int {
return bl.sizer.LogsSize(data.(plog.Logs))
}

func (bl *batchLogs) export(ctx context.Context, req any) error {
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}

func (bl *batchLogs) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) {
var req plog.Logs
var sent int
var bytes int

if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
req = splitLogs(sendBatchMaxSize, bl.logData)
Expand All @@ -490,10 +521,7 @@ func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnByt
bl.logData = plog.NewLogs()
bl.logCount = 0
}
if returnBytes {
bytes = bl.sizer.LogsSize(req)
}
return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req)
return sent, req
}

func (bl *batchLogs) itemCount() int {
Expand Down
3 changes: 2 additions & 1 deletion processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, false)
sent, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize)
sendErr := batchMetrics.export(ctx, req)
require.NoError(t, sendErr)
require.Equal(t, sendBatchMaxSize, sent)
remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
Expand Down

0 comments on commit 3c09581

Please sign in to comment.