Skip to content

Commit

Permalink
(concurrentbatchprocessor) Fail fast when size limit is exceeded (#126)
Browse files Browse the repository at this point in the history
Fixes #124 
Unrelated cleanups:
- rename max_in_flight_bytes_mib to max_in_flight_size_mib ("bytes" is
redundant), since it's unreleased
- add a brief README.md
- remove an accidental profile output named `out`.
  • Loading branch information
jmacd authored Dec 20, 2023
1 parent 75f233e commit 14a4666
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 352 deletions.
28 changes: 28 additions & 0 deletions collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Concurrent Batch Processor

This component is an experimental processor, forked from the [core
OpenTelemetry Collector `batchprocessor`
component](https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/README.md).
The differences in this component, relative to that component are:

1. Synchronous pipeline support: this component blocks each producer
until the request returns with success or an error status code.
2. Maximim in-flight-bytes setting. This component measures the
in-memory size of each request it admits to the pipeline and
otherwise stalls requests until they timeout.
3. Unlimited concurrency: this component will start as many goroutines
as needed to send batches through the pipeline.

Here is an example configuration:

```
processors:
concurrentbatch:
send_batch_max_size: 1500
send_batch_size: 1000
timeout: 1s
max_in_flight_size_mib: 128
```

In this configuration, the component will admit up to 128MiB of
request data before stalling.
14 changes: 12 additions & 2 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ type batchProcessor struct {
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher

sem *semaphore.Weighted
// in-flight bytes limit mechanism
limitBytes int64
sem *semaphore.Weighted
}

type batcher interface {
Expand Down Expand Up @@ -150,12 +152,14 @@ var _ consumer.Logs = (*batchProcessor)(nil)

// newBatchProcessor returns a new batch processor component.
func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error) {

// use lower-case, to be consistent with http/2 headers.
mks := make([]string, len(cfg.MetadataKeys))
for i, k := range cfg.MetadataKeys {
mks[i] = strings.ToLower(k)
}
sort.Strings(mks)
limitBytes := int64(cfg.MaxInFlightSizeMiB) << 20
bp := &batchProcessor{
logger: set.Logger,

Expand All @@ -166,7 +170,8 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)<<20),
limitBytes: limitBytes,
sem: semaphore.NewWeighted(limitBytes),
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
Expand Down Expand Up @@ -392,6 +397,11 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
}

bytes := int64(b.batch.sizeBytes(data))

if bytes > b.processor.limitBytes {
return fmt.Errorf("request size exceeds max-in-flight bytes: %d", bytes)
}

err := b.processor.countAcquire(ctx, bytes)
if err != nil {
return err
Expand Down
124 changes: 73 additions & 51 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
}

type blockingConsumer struct {
lock sync.Mutex
numItems int
lock sync.Mutex
numItems int
numBytesAcquired int64
blocking chan struct{}
sem *semaphore.Weighted
szr *ptrace.ProtoMarshaler
blocking chan struct{}
sem *semaphore.Weighted
szr *ptrace.ProtoMarshaler
}

func (bc *blockingConsumer) getItemsWaiting() int {
Expand Down Expand Up @@ -243,9 +243,9 @@ func (bc *blockingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// helper function to help determine a setting for cfg.MaxInFlightBytesMiB based
// helper function to help determine a setting for cfg.MaxInFlightSizeMiB based
// on the number of requests and number of spans per request.
func calculateMaxInFlightBytesMiB(numRequests, spansPerRequest int) uint32 {
func calculateMaxInFlightSizeMiB(numRequests, spansPerRequest int) uint32 {
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -270,13 +270,13 @@ func TestBatchProcessorCancelContext(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
cfg.MaxInFlightBytesMiB = calculateMaxInFlightBytesMiB(requestCount, spansPerRequest)
cfg.MaxInFlightSizeMiB = calculateMaxInFlightSizeMiB(requestCount, spansPerRequest)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bc := &blockingConsumer{
blocking: make(chan struct{}, 1),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB<<20)),
szr: &ptrace.ProtoMarshaler{},
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightSizeMiB << 20)),
szr: &ptrace.ProtoMarshaler{},
}
bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true)
require.NoError(t, err)
Expand Down Expand Up @@ -308,9 +308,9 @@ func TestBatchProcessorCancelContext(t *testing.T) {
return bc.getItemsWaiting() == numSpans
}, 5*time.Second, 10*time.Millisecond)

// MaxInFlightBytesMiB is the upperbound on in flight bytes, so calculate
// MaxInFlightSizeMiB is the upperbound on in flight bytes, so calculate
// how many free bytes the semaphore has.
excess := int64(cfg.MaxInFlightBytesMiB<<20) - bc.numBytesAcquired
excess := int64(cfg.MaxInFlightSizeMiB<<20) - bc.numBytesAcquired
assert.False(t, bp.sem.TryAcquire(excess+1))

// cancel context and wait for ConsumeTraces to return.
Expand All @@ -321,9 +321,9 @@ func TestBatchProcessorCancelContext(t *testing.T) {
// signal to the blockingConsumer to return response to waiters.
bc.unblock()

// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytesMiB bytes.
// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightSizeMiB bytes.
require.Eventually(t, func() bool {
return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB<<20))
return bp.sem.TryAcquire(int64(cfg.MaxInFlightSizeMiB << 20))
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, bp.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -591,9 +591,9 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {

func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
sink := new(consumertest.TracesSink)

Expand Down Expand Up @@ -626,9 +626,9 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -692,9 +692,9 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -766,9 +766,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -812,9 +812,9 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {

func TestBatchMetricProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -905,20 +905,20 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
runMetricsProcessorBenchmark(b, cfg)
}

func BenchmarkMultiBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
runMetricsProcessorBenchmark(b, cfg)
}
Expand Down Expand Up @@ -966,9 +966,9 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -1032,9 +1032,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

requestCount := 100
Expand Down Expand Up @@ -1084,9 +1084,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo

func TestBatchLogsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1130,9 +1130,9 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {

func TestBatchLogProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1400,7 +1400,7 @@ func TestBatchZeroConfig(t *testing.T) {
// This is a no-op configuration. No need for a timer, no
// minimum, no mxaimum, just a pass through.
cfg := Config{
MaxInFlightBytesMiB: defaultMaxMiB,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

require.NoError(t, cfg.Validate())
Expand Down Expand Up @@ -1442,8 +1442,8 @@ func TestBatchSplitOnly(t *testing.T) {
const logsPerRequest = 100

cfg := Config{
SendBatchMaxSize: maxBatch,
MaxInFlightBytesMiB: defaultMaxMiB,
SendBatchMaxSize: maxBatch,
MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB,
}

require.NoError(t, cfg.Validate())
Expand Down Expand Up @@ -1472,3 +1472,25 @@ func TestBatchSplitOnly(t *testing.T) {
require.Equal(t, maxBatch, ld.LogRecordCount())
}
}

func TestBatchTooLarge(t *testing.T) {
cfg := Config{
SendBatchMaxSize: 100000,
SendBatchSize: 100000,
MaxInFlightSizeMiB: 1,
}

require.NoError(t, cfg.Validate())

sink := new(consumertest.LogsSink)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

ld := testdata.GenerateLogs(100000)
err = batcher.ConsumeLogs(context.Background(), ld)
assert.Error(t, err)
assert.Contains(t, err.Error(), "request size exceeds max-in-flight bytes")
}
16 changes: 8 additions & 8 deletions collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ type Config struct {
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`

// MaxInFlightBytes limits the number of bytes in queue waiting to be
// MaxInFlightSizeMiB limits the number of bytes in queue waiting to be
// processed by the senders.
MaxInFlightBytesMiB uint32 `mapstructure:"max_in_flight_bytes_mib"`
MaxInFlightSizeMiB uint32 `mapstructure:"max_in_flight_size_mib"`

// Deprecated: Use MaxInFlightBytesMiB instead.
// Deprecated: Use MaxInFlightSizeMiB instead.
MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"`
}

Expand All @@ -72,18 +72,18 @@ func (cfg *Config) Validate() error {
return errors.New("timeout must be greater or equal to 0")
}

if cfg.MaxInFlightBytes != 0 && cfg.MaxInFlightBytesMiB != 0 {
return errors.New("max_in_flight_bytes is deprecated, use only max_in_flight_bytes_mib instead")
if cfg.MaxInFlightBytes != 0 && cfg.MaxInFlightSizeMiB != 0 {
return errors.New("max_in_flight_bytes is deprecated, use only max_in_flight_size_mib instead")
}

if cfg.MaxInFlightBytes > 0 {
// Round up
cfg.MaxInFlightBytesMiB = (cfg.MaxInFlightBytes - 1 + 1<<20) >> 20
cfg.MaxInFlightSizeMiB = (cfg.MaxInFlightBytes - 1 + 1<<20) >> 20
cfg.MaxInFlightBytes = 0
}

if cfg.MaxInFlightBytesMiB < 0 {
return errors.New("max_in_flight_bytes_mib must be greater than or equal to 0")
if cfg.MaxInFlightSizeMiB < 0 {
return errors.New("max_in_flight_size_mib must be greater than or equal to 0")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestUnmarshalConfig(t *testing.T) {
SendBatchMaxSize: uint32(11000),
Timeout: time.Second * 10,
MetadataCardinalityLimit: 1000,
MaxInFlightBytesMiB: 12345,
MaxInFlightSizeMiB: 12345,
}, cfg)
}

Expand Down
Loading

0 comments on commit 14a4666

Please sign in to comment.