Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Deadlock in groupbytrace #1847

Closed
wants to merge 9 commits into from
70 changes: 16 additions & 54 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ const (
// traceID to be released
traceExpired

// released traces
traceReleased

// traceID to be removed
traceRemoved

// shutdown
stop
)
Expand All @@ -56,21 +50,18 @@ type eventMachine struct {

onTraceReceived func(pdata.Traces) error
onTraceExpired func(pdata.TraceID) error
onTraceReleased func([]pdata.ResourceSpans) error
onTraceRemoved func(pdata.TraceID) error

onError func(event)

// shutdown sync
shutdownLock *sync.RWMutex
closed bool
done chan struct{}
}

func newEventMachine(logger *zap.Logger, bufferSize int) *eventMachine {
em := &eventMachine{
logger: logger,
events: make(chan event, bufferSize),
shutdownLock: &sync.RWMutex{},
logger: logger,
events: make(chan event, bufferSize),
done: make(chan struct{}),
}
return em
}
Expand All @@ -94,7 +85,9 @@ func (em *eventMachine) start() {
em.callOnError(e)
continue
}
em.onTraceReceived(payload)
if err := em.onTraceReceived(payload); err != nil {
em.logger.Debug("onTraceReceived failed", zap.Error(err))
}
case traceExpired:
if em.onTraceExpired == nil {
em.logger.Debug("onTraceExpired not set, skipping event")
Expand All @@ -107,38 +100,12 @@ func (em *eventMachine) start() {
em.callOnError(e)
continue
}
em.onTraceExpired(payload)
case traceReleased:
if em.onTraceReleased == nil {
em.logger.Debug("onTraceReleased not set, skipping event")
em.callOnError(e)
continue
}
payload, ok := e.payload.([]pdata.ResourceSpans)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
continue
if err := em.onTraceExpired(payload); err != nil {
em.logger.Debug("onTraceExpired failed", zap.Error(err))
}
em.onTraceReleased(payload)
case traceRemoved:
if em.onTraceRemoved == nil {
em.logger.Debug("onTraceRemoved not set, skipping event")
em.callOnError(e)
continue
}
payload, ok := e.payload.(pdata.TraceID)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
continue
}
em.onTraceRemoved(payload)
case stop:
em.logger.Info("shuttting down the event machine")
em.shutdownLock.Lock()
em.closed = true
em.shutdownLock.Unlock()
em.logger.Info("shutting down the event machine")
close(em.done)
e.payload.(*sync.WaitGroup).Done()
return
default:
Expand All @@ -149,17 +116,12 @@ func (em *eventMachine) start() {
}
}

func (em *eventMachine) fire(events ...event) {
em.shutdownLock.RLock()
defer em.shutdownLock.RUnlock()

// we are not accepting new events
if em.closed {
func (em *eventMachine) fire(event event) {
select {
case em.events <- event:
return
case <-em.done:
return
}

for _, e := range events {
em.events <- e
}
}

Expand Down
49 changes: 0 additions & 49 deletions processor/groupbytraceprocessor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,6 @@ func TestEventCallback(t *testing.T) {
}
},
},
{
casename: "onTraceReleased",
typ: traceReleased,
payload: []pdata.ResourceSpans{},
registerCallback: func(em *eventMachine, wg *sync.WaitGroup) {
em.onTraceReleased = func(expired []pdata.ResourceSpans) error {
wg.Done()
return nil
}
},
},
{
casename: "onTraceRemoved",
typ: traceRemoved,
payload: pdata.NewTraceID([]byte{1, 2, 3, 4}),
registerCallback: func(em *eventMachine, wg *sync.WaitGroup) {
em.onTraceRemoved = func(expired pdata.TraceID) error {
wg.Done()
assert.Equal(t, pdata.NewTraceID([]byte{1, 2, 3, 4}), expired)
return nil
}
},
},
} {
t.Run(tt.casename, func(t *testing.T) {
// prepare
Expand Down Expand Up @@ -118,14 +95,6 @@ func TestEventCallbackNotSet(t *testing.T) {
casename: "onTraceExpired",
typ: traceExpired,
},
{
casename: "onTraceReleased",
typ: traceReleased,
},
{
casename: "onTraceRemoved",
typ: traceRemoved,
},
} {
t.Run(tt.casename, func(t *testing.T) {
// prepare
Expand Down Expand Up @@ -176,24 +145,6 @@ func TestEventInvalidPayload(t *testing.T) {
}
},
},
{
casename: "onTraceReleased",
typ: traceReleased,
registerCallback: func(em *eventMachine, wg *sync.WaitGroup) {
em.onTraceReleased = func(expired []pdata.ResourceSpans) error {
return nil
}
},
},
{
casename: "onTraceRemoved",
typ: traceRemoved,
registerCallback: func(em *eventMachine, wg *sync.WaitGroup) {
em.onTraceRemoved = func(expired pdata.TraceID) error {
return nil
}
},
},
} {
t.Run(tt.casename, func(t *testing.T) {
// prepare
Expand Down
88 changes: 44 additions & 44 deletions processor/groupbytraceprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -55,6 +56,12 @@ type groupByTraceProcessor struct {

// the trace storage
st storage

// keeps track of outstanding goroutines
inprogressWg sync.WaitGroup

// for easier testing of time.AfterFunc
timer tTimer
}

var _ component.TraceProcessor = (*groupByTraceProcessor)(nil)
Expand All @@ -71,13 +78,13 @@ func newGroupByTraceProcessor(logger *zap.Logger, st storage, nextConsumer consu
eventMachine: eventMachine,
ringBuffer: newRingBuffer(config.NumTraces),
st: st,
inprogressWg: sync.WaitGroup{},
timer: &groupbyTimer{},
}

// register the callbacks
eventMachine.onTraceReceived = sp.onTraceReceived
eventMachine.onTraceExpired = sp.onTraceExpired
eventMachine.onTraceReleased = sp.onTraceReleased
eventMachine.onTraceRemoved = sp.onTraceRemoved

return sp, nil
}
Expand All @@ -103,6 +110,7 @@ func (sp *groupByTraceProcessor) Start(context.Context, component.Host) error {
// Shutdown is invoked during service shutdown.
func (sp *groupByTraceProcessor) Shutdown(_ context.Context) error {
sp.eventMachine.shutdown()
sp.inprogressWg.Wait()
return nil
}

Expand Down Expand Up @@ -150,15 +158,20 @@ func (sp *groupByTraceProcessor) processBatch(batch *singleTraceBatch) error {
// place the trace ID in the buffer, and check if an item had to be evicted
evicted := sp.ringBuffer.put(traceID)
if evicted.Bytes() != nil {
// delete from the storage
sp.eventMachine.fire(event{
typ: traceRemoved,
payload: evicted,
})

// TODO: do we want another channel that receives evicted items? record a metric perhaps?
sp.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory",
zap.String("traceID", evicted.HexString()))
// delete from the storage
trace, err := sp.st.delete(evicted)
if err != nil {
sp.logger.Info(fmt.Sprintf("couldn't delete trace %q from the storage: %s", evicted.HexString(), err.Error()))
} else if trace == nil {
sp.logger.Info(fmt.Sprintf("trace %q not found at the storage", evicted.HexString()))
} else {
// no need to wait on the nextConsumer
sp.inprogressWg.Add(1)
go sp.releaseTrace(trace)
}
}

// we have the traceID in the memory, place the spans in the storage too
Expand All @@ -168,7 +181,7 @@ func (sp *groupByTraceProcessor) processBatch(batch *singleTraceBatch) error {

sp.logger.Debug("scheduled to release trace", zap.Duration("duration", sp.config.WaitDuration))

time.AfterFunc(sp.config.WaitDuration, func() {
sp.timer.AfterFunc(sp.config.WaitDuration, func() {
// if the event machine has stopped, it will just discard the event
sp.eventMachine.fire(event{
typ: traceExpired,
Expand All @@ -194,59 +207,35 @@ func (sp *groupByTraceProcessor) onTraceExpired(traceID pdata.TraceID) error {
// delete from the map and erase its memory entry
sp.ringBuffer.delete(traceID)

// this might block, but we don't need to wait
sp.logger.Debug("marking the trace as released",
zap.String("traceID", traceID.HexString()))
go sp.markAsReleased(traceID)

return nil
}

func (sp *groupByTraceProcessor) markAsReleased(traceID pdata.TraceID) error {
// #get is a potentially blocking operation
trace, err := sp.st.get(traceID)
trace, err := sp.st.delete(traceID)
if err != nil {
return fmt.Errorf("couldn't retrieve trace %q from the storage: %w", traceID, err)
return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID.HexString(), err)
}

if trace == nil {
return fmt.Errorf("the trace %q couldn't be found at the storage", traceID)
return fmt.Errorf("trace %q not found at the storage", traceID.HexString())
}
// no need to wait on the nextConsumer
sp.inprogressWg.Add(1)
go sp.releaseTrace(trace)

// signal that the trace is ready to be released
sp.logger.Debug("trace marked as released", zap.String("traceID", traceID.HexString()))

// atomically fire the two events, so that a concurrent shutdown won't leave
// an orphaned trace in the storage
sp.eventMachine.fire(event{
typ: traceReleased,
payload: trace,
}, event{
typ: traceRemoved,
payload: traceID,
})
return nil
}

func (sp *groupByTraceProcessor) onTraceReleased(rss []pdata.ResourceSpans) error {
func (sp *groupByTraceProcessor) releaseTrace(rss []pdata.ResourceSpans) {
defer sp.inprogressWg.Done()

trace := pdata.NewTraces()
for _, rs := range rss {
trace.ResourceSpans().Append(rs)
}
return sp.nextConsumer.ConsumeTraces(context.Background(), trace)
}

func (sp *groupByTraceProcessor) onTraceRemoved(traceID pdata.TraceID) error {
trace, err := sp.st.delete(traceID)
if err != nil {
return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID.HexString(), err)
}

if trace == nil {
return fmt.Errorf("trace %q not found at the storage", traceID.HexString())
if err := sp.nextConsumer.ConsumeTraces(context.Background(), trace); err != nil {
sp.logger.Debug("next processor failed to process trace", zap.Error(err))
}

return nil
}

func (sp *groupByTraceProcessor) addSpans(traceID pdata.TraceID, trace pdata.ResourceSpans) error {
Expand Down Expand Up @@ -310,3 +299,14 @@ func splitByTrace(rs pdata.ResourceSpans) []*singleTraceBatch {

return result
}

// tTimer interface allows easier testing of ticker related functionality used by groupbytraceprocessor
type tTimer interface {
AfterFunc(d time.Duration, f func())
}

type groupbyTimer struct{}

func (*groupbyTimer) AfterFunc(d time.Duration, f func()) {
time.AfterFunc(d, f)
}
Loading