diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 6ffc878d583c..3ad07597bec3 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -61,12 +61,19 @@ type batchProcessor struct { telemetry *batchProcessorTelemetry - // batcher will be either *singletonBatcher or *multiBatcher + // batcher will be either *singletonBatcher or *multiBatcher batcher batcher } +// batcher is describes a *singletonBatcher or *multiBatcher. type batcher interface { + // start initializes background resources used by this batcher. + start(ctx context.Context) error + + // consume incorporates a new item of data into the pending batch. consume(ctx context.Context, data any) error + + // currentMetadataCardinality returns the number of shards. currentMetadataCardinality() int } @@ -135,12 +142,13 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat metadataLimit: int(cfg.MetadataCardinalityLimit), } if len(bp.metadataKeys) == 0 { - s := bp.newShard(nil) - s.start() - bp.batcher = &singleShardBatcher{batcher: s} + bp.batcher = &singleShardBatcher{ + processor: bp, + single: nil, // created in start + } } else { bp.batcher = &multiShardBatcher{ - batchProcessor: bp, + processor: bp, } } @@ -172,8 +180,8 @@ func (bp *batchProcessor) Capabilities() consumer.Capabilities { } // Start is invoked during service startup. -func (bp *batchProcessor) Start(context.Context, component.Host) error { - return nil +func (bp *batchProcessor) Start(ctx context.Context, _ component.Host) error { + return bp.batcher.start(ctx) } // Shutdown is invoked during service shutdown. @@ -281,11 +289,18 @@ func (b *shard) sendItems(trigger trigger) { // singleShardBatcher is used when metadataKeys is empty, to avoid the // additional lock and map operations used in multiBatcher. type singleShardBatcher struct { - batcher *shard + processor *batchProcessor + single *shard +} + +func (sb *singleShardBatcher) start(context.Context) error { + sb.single = sb.processor.newShard(nil) + sb.single.start() + return nil } func (sb *singleShardBatcher) consume(_ context.Context, data any) error { - sb.batcher.newItem <- data + sb.single.newItem <- data return nil } @@ -295,8 +310,8 @@ func (sb *singleShardBatcher) currentMetadataCardinality() int { // multiBatcher is used when metadataKeys is not empty. type multiShardBatcher struct { - *batchProcessor - batchers sync.Map + processor *batchProcessor + batchers sync.Map // Guards the size and the storing logic to ensure no more than limit items are stored. // If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic. @@ -304,13 +319,17 @@ type multiShardBatcher struct { size int } +func (mb *multiShardBatcher) start(context.Context) error { + return nil +} + func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. info := client.FromContext(ctx) md := map[string][]string{} var attrs []attribute.KeyValue - for _, k := range mb.metadataKeys { + for _, k := range mb.processor.metadataKeys { // Lookup the value in the incoming metadata, copy it // into the outgoing metadata, and create a unique // value for the attributeSet. @@ -327,7 +346,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { b, ok := mb.batchers.Load(aset) if !ok { mb.lock.Lock() - if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit { + if mb.processor.metadataLimit != 0 && mb.size >= mb.processor.metadataLimit { mb.lock.Unlock() return errTooManyBatchers } @@ -335,7 +354,7 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { // aset.ToSlice() returns the sorted, deduplicated, // and name-downcased list of attributes. var loaded bool - b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md)) + b, loaded = mb.batchers.LoadOrStore(aset, mb.processor.newShard(md)) if !loaded { // Start the goroutine only if we added the object to the map, otherwise is already started. b.(*shard).start()