Skip to content

Commit

Permalink
Do not inherit cancellation in the async sending queue (#1930)
Browse files Browse the repository at this point in the history
* Do not inherit cancellation in the async sending queue

Signed-off-by: Bogdan Drutu <[email protected]>

* Add tests, fix broken previous test

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Oct 9, 2020
1 parent a0a6384 commit 14a0208
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
20 changes: 20 additions & 0 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package exporterhelper

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -110,6 +111,9 @@ func (qrs *queuedRetrySender) send(req request) (int, error) {
return qrs.consumerSender.send(req)
}

// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
req.setContext(noCancellationContext{Context: req.context()})
if !qrs.queue.Produce(req) {
return req.count(), errorRefused
}
Expand Down Expand Up @@ -209,3 +213,19 @@ func max(x, y time.Duration) time.Duration {
}
return x
}

type noCancellationContext struct {
context.Context
}

func (noCancellationContext) Deadline() (deadline time.Time, ok bool) {
return
}

func (noCancellationContext) Done() <-chan struct{} {
return nil
}

func (noCancellationContext) Err() error {
return nil
}
53 changes: 37 additions & 16 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,36 +148,32 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
// require.Zero(t, be.qrSender.queue.Size())
}

func TestQueuedRetry_PreserveCancellation(t *testing.T) {
func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := CreateDefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := CreateDefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

ctx, cancelFunc := context.WithCancel(context.Background())
mockR := newMockRequest(ctx, 2, errors.New("transient error"))
start := time.Now()
cancelFunc()
mockR := newMockRequest(ctx, 2, nil)
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
droppedItems, err := be.sender.send(mockR)
require.NoError(t, err)
assert.Equal(t, 0, droppedItems)
})
cancelFunc()

// Stop should succeed and not retry.
assert.NoError(t, be.Shutdown(context.Background()))

// We should ensure that we actually did not wait for the initial backoff (5 sec).
assert.True(t, 5*time.Second > time.Since(start))
ocs.awaitAsyncProcessing()

// In the newMockConcurrentExporter we count requests and items even for failed requests.
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
ocs.checkSendItemsCount(t, 2)
ocs.checkDroppedItemsCount(t, 0)
require.Zero(t, be.qrSender.queue.Size())
}

Expand Down Expand Up @@ -318,9 +314,12 @@ func TestQueuedRetryHappyPath(t *testing.T) {
})

wantRequests := 10
reqs := make([]*mockRequest, 0, 10)
for i := 0; i < wantRequests; i++ {
ocs.run(func() {
droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, nil))
req := newMockRequest(context.Background(), 2, nil)
reqs = append(reqs, req)
droppedItems, err := be.sender.send(req)
require.NoError(t, err)
assert.Equal(t, 0, droppedItems)
})
Expand All @@ -329,10 +328,31 @@ func TestQueuedRetryHappyPath(t *testing.T) {
// Wait until all batches received
ocs.awaitAsyncProcessing()

require.Len(t, reqs, wantRequests)
for _, req := range reqs {
req.checkNumRequests(t, 1)
}

ocs.checkSendItemsCount(t, 2*wantRequests)
ocs.checkDroppedItemsCount(t, 0)
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
cancelFunc()
require.Error(t, ctx.Err())
d, ok := ctx.Deadline()
require.True(t, ok)
require.Equal(t, deadline, d)

nctx := noCancellationContext{Context: ctx}
assert.NoError(t, nctx.Err())
d, ok = nctx.Deadline()
assert.False(t, ok)
assert.True(t, d.IsZero())
}

type mockErrorRequest struct {
baseRequest
}
Expand Down Expand Up @@ -363,7 +383,7 @@ type mockRequest struct {
requestCount *int64
}

func (m *mockRequest) export(_ context.Context) (int, error) {
func (m *mockRequest) export(ctx context.Context) (int, error) {
atomic.AddInt64(m.requestCount, 1)
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -372,7 +392,8 @@ func (m *mockRequest) export(_ context.Context) (int, error) {
if err != nil {
return m.cnt, err
}
return 0, nil
// Respond like gRPC/HTTP, if context is cancelled, return error
return 0, ctx.Err()
}

func (m *mockRequest) onPartialError(consumererror.PartialError) request {
Expand Down

0 comments on commit 14a0208

Please sign in to comment.