diff --git a/processor/groupbytraceprocessor/event.go b/processor/groupbytraceprocessor/event.go index 831ffaba3b3..3427997c390 100644 --- a/processor/groupbytraceprocessor/event.go +++ b/processor/groupbytraceprocessor/event.go @@ -29,12 +29,6 @@ const ( // traceID to be released traceExpired - // released traces - traceReleased - - // traceID to be removed - traceRemoved - // shutdown stop ) @@ -56,21 +50,18 @@ type eventMachine struct { onTraceReceived func(pdata.Traces) error onTraceExpired func(pdata.TraceID) error - onTraceReleased func([]pdata.ResourceSpans) error - onTraceRemoved func(pdata.TraceID) error onError func(event) // shutdown sync - shutdownLock *sync.RWMutex - closed bool + done chan struct{} } func newEventMachine(logger *zap.Logger, bufferSize int) *eventMachine { em := &eventMachine{ - logger: logger, - events: make(chan event, bufferSize), - shutdownLock: &sync.RWMutex{}, + logger: logger, + events: make(chan event, bufferSize), + done: make(chan struct{}), } return em } @@ -94,7 +85,9 @@ func (em *eventMachine) start() { em.callOnError(e) continue } - em.onTraceReceived(payload) + if err := em.onTraceReceived(payload); err != nil { + em.logger.Debug("onTraceReceived failed", zap.Error(err)) + } case traceExpired: if em.onTraceExpired == nil { em.logger.Debug("onTraceExpired not set, skipping event") @@ -107,38 +100,12 @@ func (em *eventMachine) start() { em.callOnError(e) continue } - em.onTraceExpired(payload) - case traceReleased: - if em.onTraceReleased == nil { - em.logger.Debug("onTraceReleased not set, skipping event") - em.callOnError(e) - continue - } - payload, ok := e.payload.([]pdata.ResourceSpans) - if !ok { - // the payload had an unexpected type! - em.callOnError(e) - continue + if err := em.onTraceExpired(payload); err != nil { + em.logger.Debug("onTraceExpired failed", zap.Error(err)) } - em.onTraceReleased(payload) - case traceRemoved: - if em.onTraceRemoved == nil { - em.logger.Debug("onTraceRemoved not set, skipping event") - em.callOnError(e) - continue - } - payload, ok := e.payload.(pdata.TraceID) - if !ok { - // the payload had an unexpected type! - em.callOnError(e) - continue - } - em.onTraceRemoved(payload) case stop: - em.logger.Info("shuttting down the event machine") - em.shutdownLock.Lock() - em.closed = true - em.shutdownLock.Unlock() + em.logger.Info("shutting down the event machine") + close(em.done) e.payload.(*sync.WaitGroup).Done() return default: @@ -149,17 +116,12 @@ func (em *eventMachine) start() { } } -func (em *eventMachine) fire(events ...event) { - em.shutdownLock.RLock() - defer em.shutdownLock.RUnlock() - - // we are not accepting new events - if em.closed { +func (em *eventMachine) fire(event event) { + select { + case em.events <- event: + return + case <-em.done: return - } - - for _, e := range events { - em.events <- e } } diff --git a/processor/groupbytraceprocessor/event_test.go b/processor/groupbytraceprocessor/event_test.go index 54b2e6f3389..820058999ca 100644 --- a/processor/groupbytraceprocessor/event_test.go +++ b/processor/groupbytraceprocessor/event_test.go @@ -56,29 +56,6 @@ func TestEventCallback(t *testing.T) { } }, }, - { - casename: "onTraceReleased", - typ: traceReleased, - payload: []pdata.ResourceSpans{}, - registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { - em.onTraceReleased = func(expired []pdata.ResourceSpans) error { - wg.Done() - return nil - } - }, - }, - { - casename: "onTraceRemoved", - typ: traceRemoved, - payload: pdata.NewTraceID([]byte{1, 2, 3, 4}), - registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { - em.onTraceRemoved = func(expired pdata.TraceID) error { - wg.Done() - assert.Equal(t, pdata.NewTraceID([]byte{1, 2, 3, 4}), expired) - return nil - } - }, - }, } { t.Run(tt.casename, func(t *testing.T) { // prepare @@ -118,14 +95,6 @@ func TestEventCallbackNotSet(t *testing.T) { casename: "onTraceExpired", typ: traceExpired, }, - { - casename: "onTraceReleased", - typ: traceReleased, - }, - { - casename: "onTraceRemoved", - typ: traceRemoved, - }, } { t.Run(tt.casename, func(t *testing.T) { // prepare @@ -176,24 +145,6 @@ func TestEventInvalidPayload(t *testing.T) { } }, }, - { - casename: "onTraceReleased", - typ: traceReleased, - registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { - em.onTraceReleased = func(expired []pdata.ResourceSpans) error { - return nil - } - }, - }, - { - casename: "onTraceRemoved", - typ: traceRemoved, - registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { - em.onTraceRemoved = func(expired pdata.TraceID) error { - return nil - } - }, - }, } { t.Run(tt.casename, func(t *testing.T) { // prepare diff --git a/processor/groupbytraceprocessor/processor.go b/processor/groupbytraceprocessor/processor.go index e7cfdf8a875..0c9bf3fe753 100644 --- a/processor/groupbytraceprocessor/processor.go +++ b/processor/groupbytraceprocessor/processor.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "go.uber.org/zap" @@ -55,6 +56,12 @@ type groupByTraceProcessor struct { // the trace storage st storage + + // keeps track of outstanding goroutines + inprogressWg sync.WaitGroup + + // for easier testing of time.AfterFunc + timer tTimer } var _ component.TraceProcessor = (*groupByTraceProcessor)(nil) @@ -71,13 +78,13 @@ func newGroupByTraceProcessor(logger *zap.Logger, st storage, nextConsumer consu eventMachine: eventMachine, ringBuffer: newRingBuffer(config.NumTraces), st: st, + inprogressWg: sync.WaitGroup{}, + timer: &groupbyTimer{}, } // register the callbacks eventMachine.onTraceReceived = sp.onTraceReceived eventMachine.onTraceExpired = sp.onTraceExpired - eventMachine.onTraceReleased = sp.onTraceReleased - eventMachine.onTraceRemoved = sp.onTraceRemoved return sp, nil } @@ -103,6 +110,7 @@ func (sp *groupByTraceProcessor) Start(context.Context, component.Host) error { // Shutdown is invoked during service shutdown. func (sp *groupByTraceProcessor) Shutdown(_ context.Context) error { sp.eventMachine.shutdown() + sp.inprogressWg.Wait() return nil } @@ -150,15 +158,20 @@ func (sp *groupByTraceProcessor) processBatch(batch *singleTraceBatch) error { // place the trace ID in the buffer, and check if an item had to be evicted evicted := sp.ringBuffer.put(traceID) if evicted.Bytes() != nil { - // delete from the storage - sp.eventMachine.fire(event{ - typ: traceRemoved, - payload: evicted, - }) - // TODO: do we want another channel that receives evicted items? record a metric perhaps? sp.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory", zap.String("traceID", evicted.HexString())) + // delete from the storage + trace, err := sp.st.delete(evicted) + if err != nil { + sp.logger.Info(fmt.Sprintf("couldn't delete trace %q from the storage: %s", evicted.HexString(), err.Error())) + } else if trace == nil { + sp.logger.Info(fmt.Sprintf("trace %q not found at the storage", evicted.HexString())) + } else { + // no need to wait on the nextConsumer + sp.inprogressWg.Add(1) + go sp.releaseTrace(trace) + } } // we have the traceID in the memory, place the spans in the storage too @@ -168,7 +181,7 @@ func (sp *groupByTraceProcessor) processBatch(batch *singleTraceBatch) error { sp.logger.Debug("scheduled to release trace", zap.Duration("duration", sp.config.WaitDuration)) - time.AfterFunc(sp.config.WaitDuration, func() { + sp.timer.AfterFunc(sp.config.WaitDuration, func() { // if the event machine has stopped, it will just discard the event sp.eventMachine.fire(event{ typ: traceExpired, @@ -194,59 +207,35 @@ func (sp *groupByTraceProcessor) onTraceExpired(traceID pdata.TraceID) error { // delete from the map and erase its memory entry sp.ringBuffer.delete(traceID) - // this might block, but we don't need to wait sp.logger.Debug("marking the trace as released", zap.String("traceID", traceID.HexString())) - go sp.markAsReleased(traceID) - return nil -} - -func (sp *groupByTraceProcessor) markAsReleased(traceID pdata.TraceID) error { - // #get is a potentially blocking operation - trace, err := sp.st.get(traceID) + trace, err := sp.st.delete(traceID) if err != nil { - return fmt.Errorf("couldn't retrieve trace %q from the storage: %w", traceID, err) + return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID.HexString(), err) } if trace == nil { - return fmt.Errorf("the trace %q couldn't be found at the storage", traceID) + return fmt.Errorf("trace %q not found at the storage", traceID.HexString()) } + // no need to wait on the nextConsumer + sp.inprogressWg.Add(1) + go sp.releaseTrace(trace) - // signal that the trace is ready to be released - sp.logger.Debug("trace marked as released", zap.String("traceID", traceID.HexString())) - - // atomically fire the two events, so that a concurrent shutdown won't leave - // an orphaned trace in the storage - sp.eventMachine.fire(event{ - typ: traceReleased, - payload: trace, - }, event{ - typ: traceRemoved, - payload: traceID, - }) return nil } -func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) error { +func (sp *groupByTraceProcessor) releaseTrace(rss []pdata.ResourceSpans) { + defer sp.inprogressWg.Done() + trace := pdata.NewTraces() for _, rs := range rss { trace.ResourceSpans().Append(rs) } - return sp.nextConsumer.ConsumeTraces(context.Background(), trace) -} - -func (sp *groupByTraceProcessor) onTraceRemoved(traceID pdata.TraceID) error { - trace, err := sp.st.delete(traceID) - if err != nil { - return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID.HexString(), err) - } - if trace == nil { - return fmt.Errorf("trace %q not found at the storage", traceID.HexString()) + if err := sp.nextConsumer.ConsumeTraces(context.Background(), trace); err != nil { + sp.logger.Debug("next processor failed to process trace", zap.Error(err)) } - - return nil } func (sp *groupByTraceProcessor) addSpans(traceID pdata.TraceID, trace pdata.ResourceSpans) error { @@ -310,3 +299,14 @@ func splitByTrace(rs pdata.ResourceSpans) []*singleTraceBatch { return result } + +// tTimer interface allows easier testing of ticker related functionality used by groupbytraceprocessor +type tTimer interface { + AfterFunc(d time.Duration, f func()) +} + +type groupbyTimer struct{} + +func (*groupbyTimer) AfterFunc(d time.Duration, f func()) { + time.AfterFunc(d, f) +} diff --git a/processor/groupbytraceprocessor/processor_test.go b/processor/groupbytraceprocessor/processor_test.go index 31a759fd4bb..423ff0b49f0 100644 --- a/processor/groupbytraceprocessor/processor_test.go +++ b/processor/groupbytraceprocessor/processor_test.go @@ -18,10 +18,16 @@ import ( "context" "errors" "fmt" + "math/rand" "sync" "testing" "time" + "go.opentelemetry.io/collector/exporter/exportertest" + + "go.opentelemetry.io/collector/internal/data/testdata" + tracetranslator "go.opentelemetry.io/collector/translator/trace" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -30,6 +36,8 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" + + _ "net/http/pprof" ) var ( @@ -89,8 +97,6 @@ func TestTraceIsDispatchedAfterDuration(t *testing.T) { func TestInternalCacheLimit(t *testing.T) { // prepare - wg := &sync.WaitGroup{} // we wait for the next (mock) processor to receive the trace - config := Config{ // should be long enough for the test to run without traces being finished, but short enough to not // badly influence the testing experience @@ -100,26 +106,18 @@ func TestInternalCacheLimit(t *testing.T) { NumTraces: 5, } - wg.Add(5) // 5 traces are expected to be received - - receivedTraceIDs := []pdata.TraceID{} - mockProcessor := &mockProcessor{} - mockProcessor.onTraces = func(ctx context.Context, received pdata.Traces) error { - traceID := received.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).TraceID() - receivedTraceIDs = append(receivedTraceIDs, traceID) - fmt.Println("received trace") - wg.Done() - return nil - } + next := &exportertest.SinkTraceExporter{} st := newMemoryStorage() - p, err := newGroupByTraceProcessor(logger, st, mockProcessor, config) + p, err := newGroupByTraceProcessor(logger, st, next, config) require.NoError(t, err) + timer := manualTimer{funcs: make(chan func(), 100)} + p.timer = &timer + ctx := context.Background() p.Start(ctx, nil) - defer p.Shutdown(ctx) // test traceIDs := [][]byte{ @@ -145,18 +143,24 @@ func TestInternalCacheLimit(t *testing.T) { p.ConsumeTraces(ctx, pdata.TracesFromOtlp(batch)) } - wg.Wait() + for i := 0; i < 6; i++ { + timer.Step() + } + + p.Shutdown(ctx) + + receivedTraceIDs := []pdata.TraceID{} + for _, batch := range next.AllTraces() { + receivedTraceIDs = append(receivedTraceIDs, batch.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).TraceID()) + } // verify - assert.Equal(t, 5, len(receivedTraceIDs)) + assert.Equal(t, 6, len(receivedTraceIDs)) - for i := 5; i > 0; i-- { // last 5 traces + for i := 0; i < 6; i++ { traceID := pdata.NewTraceID(traceIDs[i]) assert.Contains(t, receivedTraceIDs, traceID) } - - // the first trace should have been evicted - assert.NotContains(t, receivedTraceIDs, traceIDs[0]) } func TestProcessorCapabilities(t *testing.T) { @@ -222,7 +226,7 @@ func TestTraceDisappearedFromStorageBeforeReleasing(t *testing.T) { NumTraces: 5, } st := &mockStorage{ - onGet: func(pdata.TraceID) ([]pdata.ResourceSpans, error) { + onDelete: func(pdata.TraceID) ([]pdata.ResourceSpans, error) { return nil, nil }, } @@ -232,6 +236,9 @@ func TestTraceDisappearedFromStorageBeforeReleasing(t *testing.T) { require.NoError(t, err) require.NotNil(t, p) + // Create the channel with no buffer so that the test is deterministic + p.eventMachine.events = make(chan event) + traceID := otlpcommon.NewTraceID([]byte{1, 2, 3, 4}) batch := []*v1.ResourceSpans{{ InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{{ @@ -250,7 +257,7 @@ func TestTraceDisappearedFromStorageBeforeReleasing(t *testing.T) { // test // we trigger this manually, instead of waiting the whole duration - err = p.markAsReleased(pdata.TraceID(traceID)) + err = p.onTraceExpired(pdata.TraceID(traceID)) // verify assert.Error(t, err) @@ -259,12 +266,12 @@ func TestTraceDisappearedFromStorageBeforeReleasing(t *testing.T) { func TestTraceErrorFromStorageWhileReleasing(t *testing.T) { // prepare config := Config{ - WaitDuration: time.Second, // we are not waiting for this whole time + WaitDuration: 5 * time.Second, // we are not waiting for this whole time NumTraces: 5, } expectedError := errors.New("some unexpected error") st := &mockStorage{ - onGet: func(pdata.TraceID) ([]pdata.ResourceSpans, error) { + onDelete: func(pdata.TraceID) ([]pdata.ResourceSpans, error) { return nil, expectedError }, } @@ -274,6 +281,9 @@ func TestTraceErrorFromStorageWhileReleasing(t *testing.T) { require.NoError(t, err) require.NotNil(t, p) + // Create the channel with no buffer so that the test is deterministic + p.eventMachine.events = make(chan event) + traceID := otlpcommon.NewTraceID([]byte{1, 2, 3, 4}) batch := []*v1.ResourceSpans{{ InstrumentationLibrarySpans: []*v1.InstrumentationLibrarySpans{{ @@ -292,9 +302,10 @@ func TestTraceErrorFromStorageWhileReleasing(t *testing.T) { // test // we trigger this manually, instead of waiting the whole duration - err = p.markAsReleased(pdata.TraceID(traceID)) + err = p.onTraceExpired(pdata.TraceID(traceID)) // verify + assert.NotNil(t, err) assert.True(t, errors.Is(err, expectedError)) } @@ -476,9 +487,10 @@ func TestErrorFromStorageWhileRemovingTrace(t *testing.T) { require.NotNil(t, p) traceID := pdata.NewTraceID([]byte{1, 2, 3, 4}) + p.ringBuffer.put(traceID) // test - err = p.onTraceRemoved(traceID) + err = p.onTraceExpired(traceID) // verify assert.True(t, errors.Is(err, expectedError)) @@ -502,9 +514,10 @@ func TestTraceNotFoundWhileRemovingTrace(t *testing.T) { require.NotNil(t, p) traceID := pdata.NewTraceID([]byte{1, 2, 3, 4}) + p.ringBuffer.put(traceID) // test - err = p.onTraceRemoved(traceID) + err = p.onTraceExpired(traceID) // verify assert.Error(t, err) @@ -745,6 +758,141 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { } } +type concurrencyTest struct { + name string + numTraces int + spansPerTrace int + ringBufferSize int + waitDuration time.Duration +} + +func TestHighConcurrency(t *testing.T) { + + tests := []concurrencyTest{ + { + name: "exceed_ring_buffer_size", + numTraces: 400, + spansPerTrace: 10, + ringBufferSize: 100, + waitDuration: 100 * time.Millisecond, + }, + { + name: "fast_wait_duration", + numTraces: 400, + spansPerTrace: 10, + ringBufferSize: 10000, + waitDuration: 1 * time.Millisecond, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + traceIds, batches := generateTraces(test.numTraces, test.spansPerTrace) + // Shuffle the batches so that the spans for each trace will arrive in a random order + rand.Shuffle(len(batches), func(i, j int) { batches[i], batches[j] = batches[j], batches[i] }) + + expectedSpanCount := 0 + for _, b := range batches { + expectedSpanCount += b.SpanCount() + } + require.EqualValues(t, test.numTraces*test.spansPerTrace, expectedSpanCount) + + st := newMemoryStorage() + next := &exportertest.SinkTraceExporter{} + + config := Config{ + WaitDuration: test.waitDuration, + NumTraces: test.ringBufferSize, + } + + logger := zap.NewNop() + // For local debugging + //conf := zap.NewDevelopmentConfig() + //// Debug to help follow the operations on the trace + //conf.Level.SetLevel(zapcore.DebugLevel) + //logger, err := conf.Build() + //require.NoError(t, err) + + p, err := newGroupByTraceProcessor(logger, st, next, config) + require.NoError(t, err) + // swap the timer for an implementation we can wait on + timer := waitableTimer{} + p.timer = &timer + + ctx := context.Background() + p.Start(ctx, nil) + + wg := sync.WaitGroup{} + wg.Add(len(batches)) + + for _, b := range batches { + go func(batch pdata.Traces) { + _ = p.ConsumeTraces(context.Background(), batch) + wg.Done() + }(b) + } + + // Wait until all calls to ConsumeTraces have completed + wg.Wait() + + // Wait for all events to be emitted by the timer + timer.wg.Wait() + + // All events have been emitted, this will wait until they are all consumed + p.Shutdown(ctx) + + receivedTraceBatches := next.AllTraces() + // ideally this would equal len(traceIds) but its not always the case b/c of timing races of the enqueue/dequeue + uniqTraceIdsToSpanCount := map[string]int{} + for _, batch := range receivedTraceBatches { + rs := batch.ResourceSpans() + for i := 0; i < rs.Len(); i++ { + ils := rs.At(i).InstrumentationLibrarySpans() + for k := 0; k < ils.Len(); k++ { + spans := ils.At(k).Spans() + for s := 0; s < ils.Len(); s++ { + traceId := spans.At(s).TraceID().HexString() + if _, ok := uniqTraceIdsToSpanCount[traceId]; !ok { + uniqTraceIdsToSpanCount[traceId] = 0 + } + uniqTraceIdsToSpanCount[traceId] = uniqTraceIdsToSpanCount[traceId] + 1 + } + } + } + } + + assert.EqualValues(t, len(traceIds), len(uniqTraceIdsToSpanCount)) + + // Under the current buffer eviction code path it intentionally discards spans, but that seems wrong + // There is a separate bug in the release phase where the operation isn't atomic and spans can be release multiple times + for k, v := range uniqTraceIdsToSpanCount { + assert.EqualValues(t, test.spansPerTrace, v, "Trace %s should have %d spans", k, test.spansPerTrace) + } + + assert.EqualValues(t, expectedSpanCount, next.SpansCount()) + }) + } +} + +func generateTraces(numTraces int, numSpans int) ([][]byte, []pdata.Traces) { + traceIds := make([][]byte, numTraces) + var tds []pdata.Traces + for i := 0; i < numTraces; i++ { + traceIds[i] = tracetranslator.UInt64ToByteTraceID(1, uint64(i+1)) + // Send each span in a separate batch + for j := 0; j < numSpans; j++ { + td := testdata.GenerateTraceDataOneSpan() + span := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) + span.SetTraceID(pdata.NewTraceID(traceIds[i])) + span.SetSpanID(tracetranslator.UInt64ToByteSpanID(uint64(i<<5) + uint64(j+1))) + tds = append(tds, td) + } + } + + return traceIds, tds +} + type mockProcessor struct { onTraces func(context.Context, pdata.Traces) error } @@ -793,3 +941,28 @@ func (st *mockStorage) delete(traceID pdata.TraceID) ([]pdata.ResourceSpans, err } return nil, nil } + +type waitableTimer struct { + wg sync.WaitGroup +} + +func (t *waitableTimer) AfterFunc(d time.Duration, f func()) { + t.wg.Add(1) + time.AfterFunc(d, func() { + defer t.wg.Done() + f() + }) +} + +type manualTimer struct { + funcs chan func() +} + +func (t *manualTimer) AfterFunc(d time.Duration, f func()) { + t.funcs <- f +} + +func (t *manualTimer) Step() { + f := <-t.funcs + f() +} diff --git a/processor/groupbytraceprocessor/storage_memory.go b/processor/groupbytraceprocessor/storage_memory.go index 8012198f90b..ed6ad3237b7 100644 --- a/processor/groupbytraceprocessor/storage_memory.go +++ b/processor/groupbytraceprocessor/storage_memory.go @@ -44,6 +44,8 @@ func (st *memoryStorage) createOrAppend(traceID pdata.TraceID, rs pdata.Resource sTraceID := traceID.HexString() st.Lock() + defer st.Unlock() + if _, ok := st.content[sTraceID]; !ok { st.content[sTraceID] = []pdata.ResourceSpans{} } @@ -52,14 +54,14 @@ func (st *memoryStorage) createOrAppend(traceID pdata.TraceID, rs pdata.Resource rs.CopyTo(newRS) st.content[sTraceID] = append(st.content[sTraceID], newRS) - st.Unlock() - return nil } func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, error) { sTraceID := traceID.HexString() st.RLock() + defer st.RUnlock() + rss, ok := st.content[sTraceID] if !ok { return nil, nil @@ -71,7 +73,6 @@ func (st *memoryStorage) get(traceID pdata.TraceID) ([]pdata.ResourceSpans, erro rs.CopyTo(newRS) result = append(result, newRS) } - st.RUnlock() return result, nil } @@ -82,6 +83,8 @@ func (st *memoryStorage) delete(traceID pdata.TraceID) ([]pdata.ResourceSpans, e sTraceID := traceID.HexString() st.Lock() + defer st.Unlock() + rss := st.content[sTraceID] result := []pdata.ResourceSpans{} for _, rs := range rss { @@ -90,7 +93,6 @@ func (st *memoryStorage) delete(traceID pdata.TraceID) ([]pdata.ResourceSpans, e result = append(result, newRS) } delete(st.content, sTraceID) - st.Unlock() return result, nil }