diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 4af8eaab42f..5934e4f2fbe 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -96,13 +96,19 @@ type shard struct { // batch is an interface generalizing the individual signal types. type batch interface { // export the current batch - export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error) + export(ctx context.Context, req any) error + + // splitBatch returns a full request built from pending items. + splitBatch(ctx context.Context, sendBatchMaxSize int) (sentBatchSize int, req any) // itemCount returns the size of the current batch itemCount() int // add item to the current batch add(item any) + + // sizeBytes counts the OTLP encoding size of the batch + sizeBytes(item any) int } var _ consumer.Traces = (*batchProcessor)(nil) @@ -258,12 +264,18 @@ func (b *shard) resetTimer() { } func (b *shard) sendItems(trigger trigger) { - sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize) + + err := b.batch.export(b.exportCtx, req) if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) - } else { - b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) + return + } + var bytes int + if b.processor.telemetry.detailed { + bytes = b.batch.sizeBytes(req) } + b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) } // singleShardBatcher is used when metadataKeys is empty, to avoid the @@ -394,10 +406,18 @@ func (bt *batchTraces) add(item any) { td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } -func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) { +func (bt *batchTraces) sizeBytes(data any) int { + return bt.sizer.TracesSize(data.(ptrace.Traces)) +} + +func (bt *batchTraces) export(ctx context.Context, req any) error { + td := req.(ptrace.Traces) + return bt.nextConsumer.ConsumeTraces(ctx, td) +} + +func (bt *batchTraces) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) { var req ptrace.Traces var sent int - var bytes int if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize { req = splitTraces(sendBatchMaxSize, bt.traceData) bt.spanCount -= sendBatchMaxSize @@ -408,10 +428,7 @@ func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnB bt.traceData = ptrace.NewTraces() bt.spanCount = 0 } - if returnBytes { - bytes = bt.sizer.TracesSize(req) - } - return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req) + return sent, req } func (bt *batchTraces) itemCount() int { @@ -429,10 +446,18 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}} } -func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) { +func (bm *batchMetrics) sizeBytes(data any) int { + return bm.sizer.MetricsSize(data.(pmetric.Metrics)) +} + +func (bm *batchMetrics) export(ctx context.Context, req any) error { + md := req.(pmetric.Metrics) + return bm.nextConsumer.ConsumeMetrics(ctx, md) +} + +func (bm *batchMetrics) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) { var req pmetric.Metrics var sent int - var bytes int if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { req = splitMetrics(sendBatchMaxSize, bm.metricData) bm.dataPointCount -= sendBatchMaxSize @@ -443,10 +468,8 @@ func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, return bm.metricData = pmetric.NewMetrics() bm.dataPointCount = 0 } - if returnBytes { - bytes = bm.sizer.MetricsSize(req) - } - return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req) + + return sent, req } func (bm *batchMetrics) itemCount() int { @@ -475,10 +498,18 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}} } -func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) { +func (bl *batchLogs) sizeBytes(data any) int { + return bl.sizer.LogsSize(data.(plog.Logs)) +} + +func (bl *batchLogs) export(ctx context.Context, req any) error { + ld := req.(plog.Logs) + return bl.nextConsumer.ConsumeLogs(ctx, ld) +} + +func (bl *batchLogs) splitBatch(_ context.Context, sendBatchMaxSize int) (int, any) { var req plog.Logs var sent int - var bytes int if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize { req = splitLogs(sendBatchMaxSize, bl.logData) @@ -490,10 +521,7 @@ func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnByt bl.logData = plog.NewLogs() bl.logCount = 0 } - if returnBytes { - bytes = bl.sizer.LogsSize(req) - } - return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req) + return sent, req } func (bl *batchLogs) itemCount() int { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index eeaf7d3f965..de314d62171 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -686,7 +686,8 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { batchMetrics.add(md) require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) - sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, false) + sent, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize) + sendErr := batchMetrics.export(ctx, req) require.NoError(t, sendErr) require.Equal(t, sendBatchMaxSize, sent) remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize