Skip to content

Commit

Permalink
[chore] Refactor RetrySender tests to be real unit-tests (#12240)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 2, 2025
1 parent d89466c commit eb4f640
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 382 deletions.
8 changes: 7 additions & 1 deletion exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type noopSender struct {
SendFunc[internal.Request]
}

func newNoopExportSender() Sender[internal.Request] {
return &noopSender{SendFunc: func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
}}
}

func newNoopObsrepSender(_ *ObsReport, next Sender[internal.Request]) Sender[internal.Request] {
return &noopSender{SendFunc: next.Send}
}
Expand Down Expand Up @@ -112,7 +118,7 @@ func TestBaseExporterLogging(t *testing.T) {
rCfg.Enabled = false
bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg))
require.NoError(t, err)
sendErr := bs.Send(context.Background(), newErrorRequest())
sendErr := bs.Send(context.Background(), newErrorRequest(errors.New("my error")))
require.Error(t, sendErr)

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
Expand Down
16 changes: 8 additions & 8 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
ocs := be.ObsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

firstMockR := newErrorRequest()
firstMockR := newErrorRequest(errors.New("transient error"))
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.Send(context.Background(), firstMockR))
Expand All @@ -57,7 +57,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {

require.NoError(t, be.Shutdown(context.Background()))

secondMockR.checkNumRequests(t, 1)
secondMockR.checkOneRequests(t)
ocs.checkSendItemsCount(t, 3)
ocs.checkDroppedItemsCount(t, 7)
require.Zero(t, be.QueueSender.(*QueueSender).queue.Size())
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
})
ocs.awaitAsyncProcessing()

mockR.checkNumRequests(t, 1)
mockR.checkOneRequests(t)
ocs.checkSendItemsCount(t, 2)
ocs.checkDroppedItemsCount(t, 0)
require.Zero(t, be.QueueSender.(*QueueSender).queue.Size())
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestQueuedRetryHappyPath(t *testing.T) {

require.Len(t, reqs, wantRequests)
for _, req := range reqs {
req.checkNumRequests(t, 1)
req.checkOneRequests(t)
}

ocs.checkSendItemsCount(t, 2*wantRequests)
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
ocs.awaitAsyncProcessing()
mockR.checkNumRequests(t, 1)
mockR.checkOneRequests(t)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -366,7 +366,7 @@ func TestQueueFailedRequestDropped(t *testing.T) {
mockR := newMockRequest(2, errors.New("some error"))
require.NoError(t, be.Send(context.Background(), mockR))
require.NoError(t, be.Shutdown(context.Background()))
mockR.checkNumRequests(t, 1)
mockR.checkOneRequests(t)
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
})
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered

mockReq := newErrorRequest()
mockReq := newErrorRequest(errors.New("transient error"))
be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
})

// wait for the item to be consumed from the queue
replacedReq.checkNumRequests(t, 1)
replacedReq.checkOneRequests(t)
})
}
runTest("enable_queue_batcher", true)
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (rs *retrySender) Send(ctx context.Context, req internal.Request) error {
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
return fmt.Errorf("request is cancelled or timed out: %w", err)
case <-rs.stopCh:
return experr.NewShutdownErr(err)
case <-time.After(backoffDelay):
Expand Down
Loading

0 comments on commit eb4f640

Please sign in to comment.