From 5c7db8cce75136d4e8ef095a1d17044af8f94fbd Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 14 Jul 2020 17:04:37 +0200 Subject: [PATCH] Fix overflowing batch size (#1310) * Fix overflowing batch size Signed-off-by: Pavol Loffay * Use max size Signed-off-by: Pavol Loffay --- processor/batchprocessor/README.md | 4 + processor/batchprocessor/batch_processor.go | 28 +++-- .../batchprocessor/batch_processor_test.go | 42 +++++++ processor/batchprocessor/config.go | 4 + processor/batchprocessor/config_test.go | 6 +- processor/batchprocessor/splittraces.go | 67 +++++++++++ processor/batchprocessor/splittraces_test.go | 113 ++++++++++++++++++ processor/batchprocessor/testdata/config.yaml | 1 + 8 files changed, 256 insertions(+), 9 deletions(-) create mode 100644 processor/batchprocessor/splittraces.go create mode 100644 processor/batchprocessor/splittraces_test.go diff --git a/processor/batchprocessor/README.md b/processor/batchprocessor/README.md index fa9cb0b58c9..b0b44d32714 100644 --- a/processor/batchprocessor/README.md +++ b/processor/batchprocessor/README.md @@ -19,6 +19,10 @@ The following configuration options can be modified: batch will be sent. - `timeout` (default = 200ms): Time duration after which a batch will be sent regardless of size. +- `send_batch_max_size` (default = 0): The maximum number of items in a batch. + This property ensures that larger batches are split into smaller units. + By default (`0`), there is no upper limit of the batch size. + It is currently supported only for the trace pipeline. Examples: diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 5a6aa50698e..d4a29d27b8e 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -43,8 +43,9 @@ type batchProcessor struct { name string logger *zap.Logger - sendBatchSize uint32 - timeout time.Duration + sendBatchSize uint32 + timeout time.Duration + sendBatchMaxSize uint32 timer *time.Timer done chan struct{} @@ -74,11 +75,12 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc name: cfg.Name(), logger: params.Logger, - sendBatchSize: cfg.SendBatchSize, - timeout: cfg.Timeout, - done: make(chan struct{}, 1), - newItem: make(chan interface{}, runtime.NumCPU()), - batch: batch, + sendBatchSize: cfg.SendBatchSize, + sendBatchMaxSize: cfg.SendBatchMaxSize, + timeout: cfg.Timeout, + done: make(chan struct{}, 1), + newItem: make(chan interface{}, runtime.NumCPU()), + batch: batch, } } @@ -114,6 +116,18 @@ func (bp *batchProcessor) startProcessingCycle() { close(bp.done) return } + if bp.sendBatchMaxSize > 0 { + if td, ok := item.(pdata.Traces); ok { + itemCount := bp.batch.itemCount() + if itemCount+uint32(td.SpanCount()) > bp.sendBatchMaxSize { + tdRemainSize := splitTrace(int(bp.sendBatchSize-itemCount), td) + item = tdRemainSize + go func() { + bp.newItem <- td + }() + } + } + } bp.batch.add(item) if bp.batch.itemCount() >= bp.sendBatchSize { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index c4330db93c8..776fc29b1c0 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -74,6 +74,48 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { } } +func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { + sink := &exportertest.SinkTraceExporter{} + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.SendBatchMaxSize = 128 + creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()} + batcher := newBatchTracesProcessor(creationParams, sink, cfg) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 1000 + spansPerRequest := 150 + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraceDataManySpansSameResource(spansPerRequest) + spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + } + + // Added to test logic that check for empty resources. + td := testdata.GenerateTraceDataEmpty() + batcher.ConsumeTraces(context.Background(), td) + + // wait for all spans to be reported + for { + if sink.SpansCount() == requestCount*spansPerRequest { + break + } + <-time.After(cfg.Timeout) + } + + require.NoError(t, batcher.Shutdown(context.Background())) + + require.Equal(t, requestCount*spansPerRequest, sink.SpansCount()) + for i := 0; i < len(sink.AllTraces())-1; i++ { + assert.Equal(t, cfg.SendBatchSize, uint32(sink.AllTraces()[i].SpanCount())) + } + // the last batch has the remaining size + assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount()) +} + func TestBatchProcessorSentBySize(t *testing.T) { views := MetricViews() require.NoError(t, view.Register(views...)) diff --git a/processor/batchprocessor/config.go b/processor/batchprocessor/config.go index 7bb72c0843e..9713628f2cc 100644 --- a/processor/batchprocessor/config.go +++ b/processor/batchprocessor/config.go @@ -29,4 +29,8 @@ type Config struct { // SendBatchSize is the size of a batch which after hit, will trigger it to be sent. SendBatchSize uint32 `mapstructure:"send_batch_size,omitempty"` + + // SendBatchMaxSize is the maximum size of a batch. Larger batches are split into smaller units. + // Default value is 0, that means no maximum size. + SendBatchMaxSize uint32 `mapstructure:"send_batch_max_size,omitempty"` } diff --git a/processor/batchprocessor/config_test.go b/processor/batchprocessor/config_test.go index bbf02a8d099..b46b1e7de37 100644 --- a/processor/batchprocessor/config_test.go +++ b/processor/batchprocessor/config_test.go @@ -44,6 +44,7 @@ func TestLoadConfig(t *testing.T) { timeout := time.Second * 10 sendBatchSize := uint32(10000) + sendBatchMaxSize := uint32(11000) assert.Equal(t, p1, &Config{ @@ -51,7 +52,8 @@ func TestLoadConfig(t *testing.T) { TypeVal: "batch", NameVal: "batch/2", }, - SendBatchSize: sendBatchSize, - Timeout: timeout, + SendBatchSize: sendBatchSize, + SendBatchMaxSize: sendBatchMaxSize, + Timeout: timeout, }) } diff --git a/processor/batchprocessor/splittraces.go b/processor/batchprocessor/splittraces.go new file mode 100644 index 00000000000..10f4a63878a --- /dev/null +++ b/processor/batchprocessor/splittraces.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchprocessor + +import ( + "go.opentelemetry.io/collector/consumer/pdata" +) + +// splitTrace removes spans from the input trace and returns a new trace of the specified size. +func splitTrace(size int, toSplit pdata.Traces) pdata.Traces { + if toSplit.SpanCount() <= size { + return toSplit + } + copiedSpans := 0 + result := pdata.NewTraces() + rss := toSplit.ResourceSpans() + for i := rss.Len() - 1; i >= 0; i-- { + rs := rss.At(i) + destRs := pdata.NewResourceSpans() + destRs.InitEmpty() + rs.Resource().CopyTo(destRs.Resource()) + result.ResourceSpans().Append(&destRs) + + for j := rs.InstrumentationLibrarySpans().Len() - 1; j >= 0; j-- { + instSpans := rs.InstrumentationLibrarySpans().At(j) + destInstSpans := pdata.NewInstrumentationLibrarySpans() + destInstSpans.InitEmpty() + destRs.InstrumentationLibrarySpans().Append(&destInstSpans) + instSpans.InstrumentationLibrary().CopyTo(destInstSpans.InstrumentationLibrary()) + + if size-copiedSpans >= instSpans.Spans().Len() { + destInstSpans.Spans().Resize(instSpans.Spans().Len()) + } else { + destInstSpans.Spans().Resize(size - copiedSpans) + } + for k, destIdx := instSpans.Spans().Len()-1, 0; k >= 0 && copiedSpans < size; k, destIdx = k-1, destIdx+1 { + span := instSpans.Spans().At(k) + span.CopyTo(destInstSpans.Spans().At(destIdx)) + copiedSpans++ + // remove span + instSpans.Spans().Resize(instSpans.Spans().Len() - 1) + } + if instSpans.Spans().Len() == 0 { + rs.InstrumentationLibrarySpans().Resize(rs.InstrumentationLibrarySpans().Len() - 1) + } + if copiedSpans == size { + return result + } + } + if rs.InstrumentationLibrarySpans().Len() == 0 { + rss.Resize(rss.Len() - 1) + } + } + return result +} diff --git a/processor/batchprocessor/splittraces_test.go b/processor/batchprocessor/splittraces_test.go new file mode 100644 index 00000000000..175849f4fa9 --- /dev/null +++ b/processor/batchprocessor/splittraces_test.go @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/data/testdata" +) + +func TestSplitTraces_noop(t *testing.T) { + td := testdata.GenerateTraceDataManySpansSameResource(20) + splitSize := 40 + split := splitTrace(splitSize, td) + assert.Equal(t, td, split) + + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(5) + assert.EqualValues(t, td, split) +} + +func TestSplitTraces(t *testing.T) { + td := testdata.GenerateTraceDataManySpansSameResource(20) + spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans() + for i := 0; i < spans.Len(); i++ { + spans.At(i).SetName(getTestSpanName(0, i)) + } + cp := pdata.NewTraces() + cp.ResourceSpans().Resize(1) + cp.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + cp.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(5) + cpSpans := cp.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans() + td.ResourceSpans().At(0).Resource().CopyTo( + cp.ResourceSpans().At(0).Resource()) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary().CopyTo( + cp.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary()) + spans.At(19).CopyTo(cpSpans.At(0)) + spans.At(18).CopyTo(cpSpans.At(1)) + spans.At(17).CopyTo(cpSpans.At(2)) + spans.At(16).CopyTo(cpSpans.At(3)) + spans.At(15).CopyTo(cpSpans.At(4)) + + splitSize := 5 + split := splitTrace(splitSize, td) + assert.Equal(t, splitSize, split.SpanCount()) + assert.Equal(t, cp, split) + assert.Equal(t, 15, td.SpanCount()) + assert.Equal(t, "test-span-0-19", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Name()) + assert.Equal(t, "test-span-0-15", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(4).Name()) +} + +func TestSplitTracesMultipleResourceSpans(t *testing.T) { + td := testdata.GenerateTraceDataManySpansSameResource(20) + spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans() + for i := 0; i < spans.Len(); i++ { + spans.At(i).SetName(getTestSpanName(0, i)) + } + td.ResourceSpans().Resize(2) + // add second index to resource spans + testdata.GenerateTraceDataManySpansSameResource(20). + ResourceSpans().At(0).CopyTo(td.ResourceSpans().At(1)) + spans = td.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans() + for i := 0; i < spans.Len(); i++ { + spans.At(i).SetName(getTestSpanName(1, i)) + } + + splitSize := 5 + split := splitTrace(splitSize, td) + assert.Equal(t, splitSize, split.SpanCount()) + assert.Equal(t, 35, td.SpanCount()) + assert.Equal(t, "test-span-1-19", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Name()) + assert.Equal(t, "test-span-1-15", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(4).Name()) +} + +func TestSplitTracesMultipleResourceSpans_split_size_greater_than_span_size(t *testing.T) { + td := testdata.GenerateTraceDataManySpansSameResource(20) + spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans() + for i := 0; i < spans.Len(); i++ { + spans.At(i).SetName(getTestSpanName(0, i)) + } + td.ResourceSpans().Resize(2) + // add second index to resource spans + testdata.GenerateTraceDataManySpansSameResource(20). + ResourceSpans().At(0).CopyTo(td.ResourceSpans().At(1)) + spans = td.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans() + for i := 0; i < spans.Len(); i++ { + spans.At(i).SetName(getTestSpanName(1, i)) + } + + splitSize := 25 + split := splitTrace(splitSize, td) + assert.Equal(t, splitSize, split.SpanCount()) + assert.Equal(t, 40-splitSize, td.SpanCount()) + assert.Equal(t, 1, td.ResourceSpans().Len()) + assert.Equal(t, "test-span-1-19", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Name()) + assert.Equal(t, "test-span-1-0", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(19).Name()) + assert.Equal(t, "test-span-0-19", split.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans().At(0).Name()) + assert.Equal(t, "test-span-0-15", split.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans().At(4).Name()) +} diff --git a/processor/batchprocessor/testdata/config.yaml b/processor/batchprocessor/testdata/config.yaml index 6631dd512a9..7cfcda24075 100644 --- a/processor/batchprocessor/testdata/config.yaml +++ b/processor/batchprocessor/testdata/config.yaml @@ -6,6 +6,7 @@ processors: batch/2: timeout: 10s send_batch_size: 10000 + send_batch_max_size: 11000 exporters: exampleexporter: