Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix overflowing batch size #1310

Merged
merged 2 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ The following configuration options can be modified:
batch will be sent.
- `timeout` (default = 200ms): Time duration after which a batch will be sent
regardless of size.
- `send_batch_max_size` (default = 0): The maximum number of items in a batch.
This property ensures that larger batches are split into smaller units.
By default (`0`), there is no upper limit of the batch size.
It is currently supported only for the trace pipeline.

Examples:

Expand Down
28 changes: 21 additions & 7 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ type batchProcessor struct {
name string
logger *zap.Logger

sendBatchSize uint32
timeout time.Duration
sendBatchSize uint32
timeout time.Duration
sendBatchMaxSize uint32

timer *time.Timer
done chan struct{}
Expand Down Expand Up @@ -74,11 +75,12 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
name: cfg.Name(),
logger: params.Logger,

sendBatchSize: cfg.SendBatchSize,
timeout: cfg.Timeout,
done: make(chan struct{}, 1),
newItem: make(chan interface{}, runtime.NumCPU()),
batch: batch,
sendBatchSize: cfg.SendBatchSize,
sendBatchMaxSize: cfg.SendBatchMaxSize,
timeout: cfg.Timeout,
done: make(chan struct{}, 1),
newItem: make(chan interface{}, runtime.NumCPU()),
batch: batch,
}
}

Expand Down Expand Up @@ -114,6 +116,18 @@ func (bp *batchProcessor) startProcessingCycle() {
close(bp.done)
return
}
if bp.sendBatchMaxSize > 0 {
if td, ok := item.(pdata.Traces); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.SpanCount()) > bp.sendBatchMaxSize {
tdRemainSize := splitTrace(int(bp.sendBatchSize-itemCount), td)
item = tdRemainSize
go func() {
bp.newItem <- td
}()
}
}
}

bp.batch.add(item)
if bp.batch.itemCount() >= bp.sendBatchSize {
Expand Down
42 changes: 42 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,48 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
}
}

func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
sink := &exportertest.SinkTraceExporter{}
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 1000
spansPerRequest := 150
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraceDataManySpansSameResource(spansPerRequest)
spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
}
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

// Added to test logic that check for empty resources.
td := testdata.GenerateTraceDataEmpty()
batcher.ConsumeTraces(context.Background(), td)

// wait for all spans to be reported
for {
if sink.SpansCount() == requestCount*spansPerRequest {
break
}
<-time.After(cfg.Timeout)
}

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*spansPerRequest, sink.SpansCount())
for i := 0; i < len(sink.AllTraces())-1; i++ {
assert.Equal(t, cfg.SendBatchSize, uint32(sink.AllTraces()[i].SpanCount()))
}
// the last batch has the remaining size
assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount())
}

func TestBatchProcessorSentBySize(t *testing.T) {
views := MetricViews()
require.NoError(t, view.Register(views...))
Expand Down
4 changes: 4 additions & 0 deletions processor/batchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ type Config struct {

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
SendBatchSize uint32 `mapstructure:"send_batch_size,omitempty"`

// SendBatchMaxSize is the maximum size of a batch. Larger batches are split into smaller units.
// Default value is 0, that means no maximum size.
SendBatchMaxSize uint32 `mapstructure:"send_batch_max_size,omitempty"`
}
6 changes: 4 additions & 2 deletions processor/batchprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ func TestLoadConfig(t *testing.T) {

timeout := time.Second * 10
sendBatchSize := uint32(10000)
sendBatchMaxSize := uint32(11000)

assert.Equal(t, p1,
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch/2",
},
SendBatchSize: sendBatchSize,
Timeout: timeout,
SendBatchSize: sendBatchSize,
SendBatchMaxSize: sendBatchMaxSize,
Timeout: timeout,
})
}
67 changes: 67 additions & 0 deletions processor/batchprocessor/splittraces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchprocessor

import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// splitTrace removes spans from the input trace and returns a new trace of the specified size.
func splitTrace(size int, toSplit pdata.Traces) pdata.Traces {
if toSplit.SpanCount() <= size {
return toSplit
}
copiedSpans := 0
result := pdata.NewTraces()
rss := toSplit.ResourceSpans()
for i := rss.Len() - 1; i >= 0; i-- {
rs := rss.At(i)
destRs := pdata.NewResourceSpans()
destRs.InitEmpty()
rs.Resource().CopyTo(destRs.Resource())
result.ResourceSpans().Append(&destRs)

for j := rs.InstrumentationLibrarySpans().Len() - 1; j >= 0; j-- {
instSpans := rs.InstrumentationLibrarySpans().At(j)
destInstSpans := pdata.NewInstrumentationLibrarySpans()
destInstSpans.InitEmpty()
destRs.InstrumentationLibrarySpans().Append(&destInstSpans)
instSpans.InstrumentationLibrary().CopyTo(destInstSpans.InstrumentationLibrary())

if size-copiedSpans >= instSpans.Spans().Len() {
destInstSpans.Spans().Resize(instSpans.Spans().Len())
} else {
destInstSpans.Spans().Resize(size - copiedSpans)
}
for k, destIdx := instSpans.Spans().Len()-1, 0; k >= 0 && copiedSpans < size; k, destIdx = k-1, destIdx+1 {
span := instSpans.Spans().At(k)
span.CopyTo(destInstSpans.Spans().At(destIdx))
copiedSpans++
// remove span
instSpans.Spans().Resize(instSpans.Spans().Len() - 1)
}
if instSpans.Spans().Len() == 0 {
rs.InstrumentationLibrarySpans().Resize(rs.InstrumentationLibrarySpans().Len() - 1)
}
if copiedSpans == size {
return result
}
}
if rs.InstrumentationLibrarySpans().Len() == 0 {
rss.Resize(rss.Len() - 1)
}
}
return result
}
113 changes: 113 additions & 0 deletions processor/batchprocessor/splittraces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchprocessor

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data/testdata"
)

func TestSplitTraces_noop(t *testing.T) {
td := testdata.GenerateTraceDataManySpansSameResource(20)
splitSize := 40
split := splitTrace(splitSize, td)
assert.Equal(t, td, split)

td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(5)
assert.EqualValues(t, td, split)
}

func TestSplitTraces(t *testing.T) {
td := testdata.GenerateTraceDataManySpansSameResource(20)
spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
for i := 0; i < spans.Len(); i++ {
spans.At(i).SetName(getTestSpanName(0, i))
}
cp := pdata.NewTraces()
cp.ResourceSpans().Resize(1)
cp.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
cp.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(5)
cpSpans := cp.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
td.ResourceSpans().At(0).Resource().CopyTo(
cp.ResourceSpans().At(0).Resource())
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary().CopyTo(
cp.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).InstrumentationLibrary())
spans.At(19).CopyTo(cpSpans.At(0))
spans.At(18).CopyTo(cpSpans.At(1))
spans.At(17).CopyTo(cpSpans.At(2))
spans.At(16).CopyTo(cpSpans.At(3))
spans.At(15).CopyTo(cpSpans.At(4))

splitSize := 5
split := splitTrace(splitSize, td)
assert.Equal(t, splitSize, split.SpanCount())
assert.Equal(t, cp, split)
assert.Equal(t, 15, td.SpanCount())
assert.Equal(t, "test-span-0-19", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Name())
assert.Equal(t, "test-span-0-15", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(4).Name())
}

func TestSplitTracesMultipleResourceSpans(t *testing.T) {
td := testdata.GenerateTraceDataManySpansSameResource(20)
spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
for i := 0; i < spans.Len(); i++ {
spans.At(i).SetName(getTestSpanName(0, i))
}
td.ResourceSpans().Resize(2)
// add second index to resource spans
testdata.GenerateTraceDataManySpansSameResource(20).
ResourceSpans().At(0).CopyTo(td.ResourceSpans().At(1))
spans = td.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans()
for i := 0; i < spans.Len(); i++ {
spans.At(i).SetName(getTestSpanName(1, i))
}

splitSize := 5
split := splitTrace(splitSize, td)
assert.Equal(t, splitSize, split.SpanCount())
assert.Equal(t, 35, td.SpanCount())
assert.Equal(t, "test-span-1-19", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Name())
assert.Equal(t, "test-span-1-15", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(4).Name())
}

func TestSplitTracesMultipleResourceSpans_split_size_greater_than_span_size(t *testing.T) {
td := testdata.GenerateTraceDataManySpansSameResource(20)
spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
for i := 0; i < spans.Len(); i++ {
spans.At(i).SetName(getTestSpanName(0, i))
}
td.ResourceSpans().Resize(2)
// add second index to resource spans
testdata.GenerateTraceDataManySpansSameResource(20).
ResourceSpans().At(0).CopyTo(td.ResourceSpans().At(1))
spans = td.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans()
for i := 0; i < spans.Len(); i++ {
spans.At(i).SetName(getTestSpanName(1, i))
}

splitSize := 25
split := splitTrace(splitSize, td)
assert.Equal(t, splitSize, split.SpanCount())
assert.Equal(t, 40-splitSize, td.SpanCount())
assert.Equal(t, 1, td.ResourceSpans().Len())
assert.Equal(t, "test-span-1-19", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Name())
assert.Equal(t, "test-span-1-0", split.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(19).Name())
assert.Equal(t, "test-span-0-19", split.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans().At(0).Name())
assert.Equal(t, "test-span-0-15", split.ResourceSpans().At(1).InstrumentationLibrarySpans().At(0).Spans().At(4).Name())
}
1 change: 1 addition & 0 deletions processor/batchprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ processors:
batch/2:
timeout: 10s
send_batch_size: 10000
send_batch_max_size: 11000

exporters:
exampleexporter:
Expand Down