Skip to content

Commit

Permalink
Cleanup initialization order for exporterhelper
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 1, 2025
1 parent 5f95f72 commit dfa9a23
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 130 deletions.
25 changes: 25 additions & 0 deletions .chloggen/clenup-initialization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug that the exporter with new batcher may have been marked as non mutation.

# One or more tracking issues or pull requests related to the change
issues: [12239]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Only affects users that manually turned on `exporter.UsePullingBasedExporterQueueBatcher` featuregate.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
140 changes: 76 additions & 64 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request]
type ObsrepSenderFactory = func(obsrep *ObsReport, next Sender[internal.Request]) Sender[internal.Request]

// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error
Expand All @@ -52,17 +52,20 @@ type BaseExporter struct {
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
BatchSender Sender[internal.Request]
QueueSender Sender[internal.Request]
ObsrepSender Sender[internal.Request]
RetrySender Sender[internal.Request]
TimeoutSender *TimeoutSender // TimeoutSender is always initialized.
BatchSender Sender[internal.Request]
QueueSender Sender[internal.Request]
ObsrepSender Sender[internal.Request]
RetrySender Sender[internal.Request]

firstSender Sender[internal.Request]

ConsumerOptions []consumer.Option

queueCfg exporterqueue.Config
timeoutCfg TimeoutConfig
retryCfg configretry.BackOffConfig
queueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
queueCfg exporterqueue.Config
batcherCfg exporterbatcher.Config
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand All @@ -72,98 +75,107 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

be := &BaseExporter{
BatchSender: &BaseSender[internal.Request]{},
QueueSender: &BaseSender[internal.Request]{},
ObsrepSender: osf(obsReport),
RetrySender: &BaseSender[internal.Request]{},
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},

Set: set,
timeoutCfg: NewDefaultTimeoutConfig(),
Set: set,
}

for _, op := range options {
err = multierr.Append(err, op(be))
if err = op(be); err != nil {
return nil, err
}
}
if err != nil {
return nil, err

// TimeoutSender is always initialized.
be.firstSender = &TimeoutSender{cfg: be.timeoutCfg}
if be.retryCfg.Enabled {
be.RetrySender = newRetrySender(be.retryCfg, set, be.firstSender)
be.firstSender = be.RetrySender
}

be.ObsrepSender = osf(obsReport, be.firstSender)
be.firstSender = be.ObsrepSender

if be.batcherCfg.Enabled {
// Batcher mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.batcherCfg.Enabled && !be.queueCfg.Enabled {
concurrencyLimit := int64(0)
if be.queueCfg.Enabled {
concurrencyLimit = int64(be.queueCfg.NumConsumers)
}
be.BatchSender = NewBatchSender(be.batcherCfg, set, concurrencyLimit, be.firstSender)
be.firstSender = be.BatchSender
}

if be.queueCfg.Enabled {
qSet := exporterqueue.Settings{
Signal: signal,
ExporterSettings: be.Set,
ExporterSettings: set,
}
q := be.queueFactory(context.Background(), qSet, be.queueCfg)
q, err = newObsQueue(qSet, q)
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
return nil, err
}
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.BatcherCfg)
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}

be.connectSenders()

if bs, ok := be.BatchSender.(*BatchSender); ok {
// If queue sender is enabled assign to the batch sender the same number of workers.
if qs, ok := be.QueueSender.(*QueueSender); ok {
bs.concurrencyLimit = int64(qs.numConsumers)
}
// Batcher sender mutates the data.
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
be.firstSender = be.QueueSender
}

return be, nil
}

// Send sends the request using the first sender in the chain.
func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error {
err := be.QueueSender.Send(ctx, req)
err := be.firstSender.Send(ctx, req)
if err != nil {
be.Set.Logger.Error("Exporting failed. Rejecting data."+be.ExportFailureMessage,
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
}
return err
}

// connectSenders connects the senders in the predefined order.
func (be *BaseExporter) connectSenders() {
be.QueueSender.SetNextSender(be.BatchSender)
be.BatchSender.SetNextSender(be.ObsrepSender)
be.ObsrepSender.SetNextSender(be.RetrySender)
be.RetrySender.SetNextSender(be.TimeoutSender)
}

func (be *BaseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the BatchSender.
if err := be.BatchSender.Start(ctx, host); err != nil {
return err
if be.BatchSender != nil {
// If no error then start the BatchSender.
if err := be.BatchSender.Start(ctx, host); err != nil {
return err
}

Check warning on line 148 in exporter/exporterhelper/internal/base_exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/base_exporter.go#L147-L148

Added lines #L147 - L148 were not covered by tests
}

// Last start the queueSender.
return be.QueueSender.Start(ctx, host)
if be.QueueSender != nil {
return be.QueueSender.Start(ctx, host)
}

return nil
}

func (be *BaseExporter) Shutdown(ctx context.Context) error {
return multierr.Combine(
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
be.RetrySender.Shutdown(ctx),
// Then shutdown the batch sender
be.BatchSender.Shutdown(ctx),
// Then shutdown the queue sender.
be.QueueSender.Shutdown(ctx),
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
var err error

// First shutdown the retry sender, so the queue sender can flush the queue without retries.
if be.RetrySender != nil {
err = multierr.Append(err, be.RetrySender.Shutdown(ctx))
}

// Then shutdown the batch sender
if be.BatchSender != nil {
err = multierr.Append(err, be.BatchSender.Shutdown(ctx))
}

// Then shutdown the queue sender.
if be.QueueSender != nil {
err = multierr.Append(err, be.QueueSender.Shutdown(ctx))
}

// Last shutdown the wrapped exporter itself.
return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx))
}

// WithStart overrides the default Start function for an exporter.
Expand All @@ -188,7 +200,7 @@ func WithShutdown(shutdown component.ShutdownFunc) Option {
// The default TimeoutConfig is 5 seconds.
func WithTimeout(timeoutConfig TimeoutConfig) Option {
return func(o *BaseExporter) error {
o.TimeoutSender.cfg = timeoutConfig
o.timeoutCfg = timeoutConfig
return nil
}
}
Expand All @@ -201,7 +213,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
o.ExportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
return nil
}
o.RetrySender = newRetrySender(config, o.Set)
o.retryCfg = config
return nil
}
}
Expand Down Expand Up @@ -268,7 +280,7 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config) Option {
return func(o *BaseExporter) error {
o.BatcherCfg = cfg
o.batcherCfg = cfg
return nil
}
}
Expand Down
10 changes: 8 additions & 2 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ var (
}()
)

func newNoopObsrepSender(*ObsReport) Sender[internal.Request] {
return &BaseSender[internal.Request]{}
type noopSender struct {
component.StartFunc
component.ShutdownFunc
SendFunc[internal.Request]
}

func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] {
return &noopSender{SendFunc: next.Send}
}

func TestBaseExporter(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type BatchSender struct {
BaseSender[internal.Request]
cfg exporterbatcher.Config
cfg exporterbatcher.Config
next Sender[internal.Request]

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
Expand All @@ -43,11 +43,13 @@ type BatchSender struct {
stopped *atomic.Bool
}

// newBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
// NewBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, concurrencyLimit int64, next Sender[internal.Request]) *BatchSender {
bs := &BatchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
next: next,
concurrencyLimit: concurrencyLimit,
activeBatch: newEmptyBatch(),
logger: set.Logger,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
Expand Down Expand Up @@ -119,7 +121,7 @@ func newEmptyBatch() *batch {
// Caller must hold the lock.
func (bs *BatchSender) exportActiveBatch() {
go func(b *batch) {
b.err = bs.NextSender.Send(b.ctx, b.request)
b.err = bs.next.Send(b.ctx, b.request)
close(b.done)
bs.activeRequests.Add(-b.requestsBlocked)
}(bs.activeBatch)
Expand All @@ -138,7 +140,7 @@ func (bs *BatchSender) isActiveBatchReady() bool {
func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
// Stopped batch sender should act as pass-through to allow the queue to be drained.
if bs.stopped.Load() {
return bs.NextSender.Send(ctx, req)
return bs.next.Send(ctx, req)
}

if bs.cfg.MaxSizeItems > 0 {
Expand Down Expand Up @@ -190,7 +192,7 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req
// Intentionally do not put the last request in the active batch to not block it.
// TODO: Consider including the partial request in the error to avoid double publishing.
for _, r := range reqs {
if err := bs.NextSender.Send(ctx, r); err != nil {
if err := bs.next.Send(ctx, r); err != nil {
return err
}
}
Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/internal/obs_report_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/internal"
)

type obsReportSender[K internal.Request] struct {
BaseSender[K]
component.StartFunc
component.ShutdownFunc
obsrep *ObsReport
next Sender[K]
}

func NewObsReportSender[K internal.Request](obsrep *ObsReport) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep}
func NewObsReportSender[K internal.Request](obsrep *ObsReport, next Sender[K]) Sender[K] {
return &obsReportSender[K]{obsrep: obsrep, next: next}
}

func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
c := ors.obsrep.StartOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.NextSender.Send(c, req)
err := ors.next.Send(c, req)
ors.obsrep.EndOp(c, items, err)
return err
}
Loading

0 comments on commit dfa9a23

Please sign in to comment.