From dd351375458072fe29594f99a2eb8cbd9d7b6b1f Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 25 Sep 2024 18:04:38 -0700 Subject: [PATCH] [exporter] [chore] Initialize batchSender and queueSender after configuration - #2 (#11240) This PR follows https://github.com/open-telemetry/opentelemetry-collector/pull/11041. The previous PR changed the initialization of `batchSender` and `queueSender` to AFTER configuration, because that enables us to access `queueConfig` and `batcherConfig` in the same place. I noticed since then that there is another API for queue configuration, and this PR takes care of that other API #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector/issues/10368 #### Testing Ran `opentelemetry-collector$ make` to make sure all tests still pass. --- .../exporterhelper/internal/base_exporter.go | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 9d2713c01c0..0155cb53298 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -61,10 +61,11 @@ type BaseExporter struct { ConsumerOptions []consumer.Option - QueueCfg exporterqueue.Config - QueueFactory exporterqueue.Factory[internal.Request] - BatcherCfg exporterbatcher.Config - BatcherOpts []BatcherOption + QueueCfg QueueConfig + ExporterQueueCfg exporterqueue.Config + QueueFactory exporterqueue.Factory[internal.Request] + BatcherCfg exporterbatcher.Config + BatcherOpts []BatcherOption } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) { @@ -93,28 +94,40 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe return nil, err } - if be.BatcherCfg.Enabled { - bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc) - for _, opt := range be.BatcherOpts { - err = multierr.Append(err, opt(bs)) - } - if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { - err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) - } - be.BatchSender = bs + if be.QueueCfg.Enabled { + q := be.QueueFactory(context.Background(), exporterqueue.Settings{ + Signal: be.Signal, + ExporterSettings: be.Set, + }, exporterqueue.Config{ + Enabled: be.QueueCfg.Enabled, + NumConsumers: be.QueueCfg.NumConsumers, + QueueSize: be.QueueCfg.QueueSize, + }) + be.QueueSender = NewQueueSender(q, be.Set, be.QueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) } - if be.QueueCfg.Enabled { + if be.ExporterQueueCfg.Enabled { set := exporterqueue.Settings{ Signal: be.Signal, ExporterSettings: be.Set, } - be.QueueSender = NewQueueSender(be.QueueFactory(context.Background(), set, be.QueueCfg), be.Set, be.QueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) + be.QueueSender = NewQueueSender(be.QueueFactory(context.Background(), set, be.ExporterQueueCfg), be.Set, be.ExporterQueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) for _, op := range options { err = multierr.Append(err, op(be)) } } + if be.BatcherCfg.Enabled { + bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc) + for _, opt := range be.BatcherOpts { + err = multierr.Append(err, opt(bs)) + } + if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { + err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) + } + be.BatchSender = bs + } + if err != nil { return nil, err } @@ -230,19 +243,11 @@ func WithQueue(config QueueConfig) Option { o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - qf := exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{ + o.QueueCfg = config + o.QueueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{ Marshaler: o.Marshaler, Unmarshaler: o.Unmarshaler, }) - q := qf(context.Background(), exporterqueue.Settings{ - Signal: o.Signal, - ExporterSettings: o.Set, - }, exporterqueue.Config{ - Enabled: config.Enabled, - NumConsumers: config.NumConsumers, - QueueSize: config.QueueSize, - }) - o.QueueSender = NewQueueSender(q, o.Set, config.NumConsumers, o.ExportFailureMessage, o.Obsrep) return nil } } @@ -260,7 +265,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - o.QueueCfg = cfg + o.ExporterQueueCfg = cfg o.QueueFactory = queueFactory return nil }