Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(concurrentbatchprocessor) Fail fast when size limit is exceeded #126

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Concurrent Batch Processor

This component is an experimental processor, forked from the [core
OpenTelemetry Collector `batchprocessor`
component](https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md).
The differences in this component, relative to that component are:

1. Synchronous pipeline support: this component blocks each producer
until the request returns with success or an error status code.
2. Maximim in-flight-bytes setting. This component measures the
in-memory size of each request it admits to the pipeline and
otherwise stalls requests until they timeout.
3. Unlimited concurrency: this component will start as many goroutines
as needed to send batches through the pipeline.

Here is an example configuration:

```
processors:
concurrentbatch:
send_batch_max_size: 1500
send_batch_size: 1000
timeout: 1s
max_in_flight_size_mib: 128
```

In this configuration, the component will admit up to 128MiB of
request data before stalling.
14 changes: 12 additions & 2 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ type batchProcessor struct {
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher

sem *semaphore.Weighted
// in-flight bytes limit mechanism
limitBytes int64
sem *semaphore.Weighted
}

type batcher interface {
Expand Down Expand Up @@ -150,12 +152,14 @@ var _ consumer.Logs = (*batchProcessor)(nil)

// newBatchProcessor returns a new batch processor component.
func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error) {

// use lower-case, to be consistent with http/2 headers.
mks := make([]string, len(cfg.MetadataKeys))
for i, k := range cfg.MetadataKeys {
mks[i] = strings.ToLower(k)
}
sort.Strings(mks)
limitBytes := int64(cfg.MaxInFlightSizeMiB) << 20
bp := &batchProcessor{
logger: set.Logger,

Expand All @@ -166,7 +170,8 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)<<20),
limitBytes: limitBytes,
sem: semaphore.NewWeighted(limitBytes),
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
Expand Down Expand Up @@ -392,6 +397,11 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
}

bytes := int64(b.batch.sizeBytes(data))

if bytes > b.processor.limitBytes {
return fmt.Errorf("request size exceeds max-in-flight bytes: %d", bytes)
}

err := b.processor.countAcquire(ctx, bytes)
if err != nil {
return err
Expand Down
124 changes: 73 additions & 51 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
}

type blockingConsumer struct {
lock sync.Mutex
numItems int
lock sync.Mutex
numItems int
numBytesAcquired int64
blocking chan struct{}
sem *semaphore.Weighted
szr *ptrace.ProtoMarshaler
blocking chan struct{}
sem *semaphore.Weighted
szr *ptrace.ProtoMarshaler
}

func (bc *blockingConsumer) getItemsWaiting() int {
Expand Down Expand Up @@ -243,9 +243,9 @@ func (bc *blockingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// helper function to help determine a setting for cfg.MaxInFlightBytesMiB based
// helper function to help determine a setting for cfg.MaxInFlightSizeMiB based
// on the number of requests and number of spans per request.
func calculateMaxInFlightBytesMiB(numRequests, spansPerRequest int) uint32 {
func calculateMaxInFlightSizeMiB(numRequests, spansPerRequest int) uint32 {
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -270,13 +270,13 @@ func TestBatchProcessorCancelContext(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
cfg.MaxInFlightBytesMiB = calculateMaxInFlightBytesMiB(requestCount, spansPerRequest)
cfg.MaxInFlightSizeMiB = calculateMaxInFlightSizeMiB(requestCount, spansPerRequest)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bc := &blockingConsumer{
blocking: make(chan struct{}, 1),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB<<20)),
szr: &ptrace.ProtoMarshaler{},
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightSizeMiB << 20)),
szr: &ptrace.ProtoMarshaler{},
}
bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true)
require.NoError(t, err)
Expand Down Expand Up @@ -308,9 +308,9 @@ func TestBatchProcessorCancelContext(t *testing.T) {
return bc.getItemsWaiting() == numSpans
}, 5*time.Second, 10*time.Millisecond)

// MaxInFlightBytesMiB is the upperbound on in flight bytes, so calculate
// MaxInFlightSizeMiB is the upperbound on in flight bytes, so calculate
// how many free bytes the semaphore has.
excess := int64(cfg.MaxInFlightBytesMiB<<20) - bc.numBytesAcquired
excess := int64(cfg.MaxInFlightSizeMiB<<20) - bc.numBytesAcquired
assert.False(t, bp.sem.TryAcquire(excess+1))

// cancel context and wait for ConsumeTraces to return.
Expand All @@ -321,9 +321,9 @@ func TestBatchProcessorCancelContext(t *testing.T) {
// signal to the blockingConsumer to return response to waiters.
bc.unblock()

// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytesMiB bytes.
// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightSizeMiB bytes.
require.Eventually(t, func() bool {
return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB<<20))
return bp.sem.TryAcquire(int64(cfg.MaxInFlightSizeMiB << 20))
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, bp.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -591,9 +591,9 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {

func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
sink := new(consumertest.TracesSink)

Expand Down Expand Up @@ -626,9 +626,9 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -692,9 +692,9 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -766,9 +766,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -812,9 +812,9 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {

func TestBatchMetricProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -905,20 +905,20 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
runMetricsProcessorBenchmark(b, cfg)
}

func BenchmarkMultiBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
runMetricsProcessorBenchmark(b, cfg)
}
Expand Down Expand Up @@ -966,9 +966,9 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -1032,9 +1032,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -1084,9 +1084,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo

func TestBatchLogsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1130,9 +1130,9 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {

func TestBatchLogProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1400,7 +1400,7 @@ func TestBatchZeroConfig(t *testing.T) {
// This is a no-op configuration. No need for a timer, no
// minimum, no mxaimum, just a pass through.
cfg := Config{
MaxInFlightBytesMiB: defaultMaxMiB,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

require.NoError(t, cfg.Validate())
Expand Down Expand Up @@ -1442,8 +1442,8 @@ func TestBatchSplitOnly(t *testing.T) {
const logsPerRequest = 100

cfg := Config{
SendBatchMaxSize: maxBatch,
MaxInFlightBytesMiB: defaultMaxMiB,
SendBatchMaxSize: maxBatch,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

require.NoError(t, cfg.Validate())
Expand Down Expand Up @@ -1472,3 +1472,25 @@ func TestBatchSplitOnly(t *testing.T) {
require.Equal(t, maxBatch, ld.LogRecordCount())
}
}

func TestBatchTooLarge(t *testing.T) {
cfg := Config{
SendBatchMaxSize: 100000,
SendBatchSize: 100000,
MaxInFlightSizeMiB: 1,
}

require.NoError(t, cfg.Validate())

sink := new(consumertest.LogsSink)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

ld := testdata.GenerateLogs(100000)
err = batcher.ConsumeLogs(context.Background(), ld)
assert.Error(t, err)
assert.Contains(t, err.Error(), "request size exceeds max-in-flight bytes")
}
16 changes: 8 additions & 8 deletions collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ type Config struct {
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`

// MaxInFlightBytes limits the number of bytes in queue waiting to be
// MaxInFlightSizeMiB limits the number of bytes in queue waiting to be
// processed by the senders.
MaxInFlightBytesMiB uint32 `mapstructure:"max_in_flight_bytes_mib"`
MaxInFlightSizeMiB uint32 `mapstructure:"max_in_flight_size_mib"`

// Deprecated: Use MaxInFlightBytesMiB instead.
// Deprecated: Use MaxInFlightSizeMiB instead.
MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"`
}

Expand All @@ -72,18 +72,18 @@ func (cfg *Config) Validate() error {
return errors.New("timeout must be greater or equal to 0")
}

if cfg.MaxInFlightBytes != 0 && cfg.MaxInFlightBytesMiB != 0 {
return errors.New("max_in_flight_bytes is deprecated, use only max_in_flight_bytes_mib instead")
if cfg.MaxInFlightBytes != 0 && cfg.MaxInFlightSizeMiB != 0 {
return errors.New("max_in_flight_bytes is deprecated, use only max_in_flight_size_mib instead")
}

if cfg.MaxInFlightBytes > 0 {
// Round up
cfg.MaxInFlightBytesMiB = (cfg.MaxInFlightBytes - 1 + 1<<20) >> 20
cfg.MaxInFlightSizeMiB = (cfg.MaxInFlightBytes - 1 + 1<<20) >> 20
cfg.MaxInFlightBytes = 0
}

if cfg.MaxInFlightBytesMiB < 0 {
return errors.New("max_in_flight_bytes_mib must be greater than or equal to 0")
if cfg.MaxInFlightSizeMiB < 0 {
return errors.New("max_in_flight_size_mib must be greater than or equal to 0")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestUnmarshalConfig(t *testing.T) {
SendBatchMaxSize: uint32(11000),
Timeout: time.Second * 10,
MetadataCardinalityLimit: 1000,
MaxInFlightBytesMiB: 12345,
MaxInFlightSizeMiB: 12345,
}, cfg)
}

Expand Down
Loading