Skip to content

Commit

Permalink
Add metrics to groupbytraceprocessor, wait for queue to be drained du…
Browse files Browse the repository at this point in the history
…ring shutdown.

Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Sep 24, 2020
1 parent 430c002 commit 2ccd3d8
Show file tree
Hide file tree
Showing 7 changed files with 433 additions and 85 deletions.
257 changes: 181 additions & 76 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package groupbytraceprocessor

import (
"context"
"sync"
"time"

"go.opencensus.io/stats"
"go.uber.org/zap"

"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -34,9 +37,6 @@ const (

// traceID to be removed
traceRemoved

// shutdown
stop
)

type eventType int
Expand All @@ -51,7 +51,11 @@ type event struct {
// on the callbacks, otherwise, events might pile up. When enough events are piled up, firing an
// event will block until enough capacity is available to accept the events.
type eventMachine struct {
events chan event
events chan event
close chan struct{}
metricsCollectionInterval time.Duration
shutdownTimeout time.Duration

logger *zap.Logger

onTraceReceived func(pdata.Traces) error
Expand All @@ -68,84 +72,148 @@ type eventMachine struct {

func newEventMachine(logger *zap.Logger, bufferSize int) *eventMachine {
em := &eventMachine{
logger: logger,
events: make(chan event, bufferSize),
shutdownLock: &sync.RWMutex{},
logger: logger,
events: make(chan event, bufferSize),
close: make(chan struct{}),
shutdownLock: &sync.RWMutex{},
metricsCollectionInterval: time.Second,
shutdownTimeout: 10 * time.Second,
}
return em
}

func (em *eventMachine) startInBackground() {
go em.start()
go em.periodicMetrics()
}

func (em *eventMachine) periodicMetrics() {
numEvents := len(em.events)
em.logger.Debug("recording current state of the queue", zap.Int("num-events", numEvents))
stats.Record(context.Background(), mNumEventsInQueue.M(int64(numEvents)))

if em.closed {
return
}

time.AfterFunc(em.metricsCollectionInterval, func() {
em.periodicMetrics()
})
}

func (em *eventMachine) start() {
for e := range em.events {
switch e.typ {
case traceReceived:
if em.onTraceReceived == nil {
em.logger.Debug("onTraceReceived not set, skipping event")
em.callOnError(e)
continue
}
payload, ok := e.payload.(pdata.Traces)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
continue
}
em.onTraceReceived(payload)
case traceExpired:
if em.onTraceExpired == nil {
em.logger.Debug("onTraceExpired not set, skipping event")
em.callOnError(e)
continue
}
payload, ok := e.payload.(pdata.TraceID)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
continue
}
em.onTraceExpired(payload)
case traceReleased:
if em.onTraceReleased == nil {
em.logger.Debug("onTraceReleased not set, skipping event")
em.callOnError(e)
continue
}
payload, ok := e.payload.([]pdata.ResourceSpans)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
continue
}
em.onTraceReleased(payload)
case traceRemoved:
if em.onTraceRemoved == nil {
em.logger.Debug("onTraceRemoved not set, skipping event")
em.callOnError(e)
continue
}
payload, ok := e.payload.(pdata.TraceID)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
continue
}
em.onTraceRemoved(payload)
case stop:
em.logger.Info("shuttting down the event machine")
em.shutdownLock.Lock()
em.closed = true
em.shutdownLock.Unlock()
e.payload.(*sync.WaitGroup).Done()
for {
select {
case e := <-em.events:
em.handleEvent(e)
case <-em.close:
return
}
}
}

func (em *eventMachine) handleEvent(e event) {
switch e.typ {
case traceReceived:
if em.onTraceReceived == nil {
em.logger.Debug("onTraceReceived not set, skipping event")
em.callOnError(e)
return
}
payload, ok := e.payload.(pdata.Traces)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
return
}

succeeded, err := doWithTimeoutMeasured(mEventLatencyOnTraceReceived, func() error {
return em.onTraceReceived(payload)
})
if err != nil {
em.logger.Error("failed to mark trace as received", zap.Error(err))
}
if succeeded {
em.logger.Debug("onTraceReceived finished")
} else {
em.logger.Debug("onTraceReceived aborted")
}
case traceExpired:
if em.onTraceExpired == nil {
em.logger.Debug("onTraceExpired not set, skipping event")
em.callOnError(e)
return
}
payload, ok := e.payload.(pdata.TraceID)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
return
default:
em.logger.Info("unknown event type", zap.Any("event", e.typ))
}

succeeded, err := doWithTimeoutMeasured(mEventLatencyOnTraceExpired, func() error {
return em.onTraceExpired(payload)
})
if err != nil {
em.logger.Error("failed to mark trace as expired", zap.Error(err))
}
if succeeded {
em.logger.Debug("onTraceExpired finished")
} else {
em.logger.Debug("onTraceExpired aborted")
}
case traceReleased:
if em.onTraceReleased == nil {
em.logger.Debug("onTraceReleased not set, skipping event")
em.callOnError(e)
continue
return
}
payload, ok := e.payload.([]pdata.ResourceSpans)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
return
}

succeeded, err := doWithTimeoutMeasured(mEventLatencyOnTraceReleased, func() error {
return em.onTraceReleased(payload)
})
if err != nil {
em.logger.Error("failed to mark trace as released", zap.Error(err))
}
if succeeded {
em.logger.Debug("onTraceReleased finished")
} else {
em.logger.Debug("onTraceReleased aborted")
}
case traceRemoved:
if em.onTraceRemoved == nil {
em.logger.Debug("onTraceRemoved not set, skipping event")
em.callOnError(e)
return
}
payload, ok := e.payload.(pdata.TraceID)
if !ok {
// the payload had an unexpected type!
em.callOnError(e)
return
}

succeeded, err := doWithTimeoutMeasured(mEventLatencyOnTraceRemoved, func() error {
return em.onTraceRemoved(payload)
})
if err != nil {
em.logger.Error("failed to mark trace as removed", zap.Error(err))
}
if succeeded {
em.logger.Debug("onTraceRemoved finished")
} else {
em.logger.Debug("onTraceRemoved aborted")
}
default:
em.logger.Info("unknown event type", zap.Any("event", e.typ))
em.callOnError(e)
return
}
}

Expand All @@ -164,17 +232,54 @@ func (em *eventMachine) fire(events ...event) {
}

func (em *eventMachine) shutdown() {
wg := &sync.WaitGroup{}
wg.Add(1)
em.events <- event{
typ: stop,
payload: wg,
em.logger.Info("shutting down the event manager", zap.Int("pending-events", len(em.events)))
em.shutdownLock.Lock()
em.closed = true
em.shutdownLock.Unlock()

// we never return an error here
ok, _ := doWithTimeout(em.shutdownTimeout, func() error {
for {
if len(em.events) == 0 {
return nil
}
time.Sleep(100 * time.Millisecond)
}
})

if !ok {
em.logger.Info("forcing the shutdown of the event manager", zap.Int("pending-events", len(em.events)))
}
wg.Wait()
close(em.close)
}

func (em *eventMachine) callOnError(e event) {
if em.onError != nil {
em.onError(e)
}
}

// doWithTimeoutMeasured uses doWithTimeout to execute the given function, and measures the time it took using the provided measure.
func doWithTimeoutMeasured(measure *stats.Int64Measure, do func() error) (bool, error) {
start := time.Now()
ok, err := doWithTimeout(time.Second, do)
duration := time.Since(start)
stats.Record(context.Background(), measure.M(duration.Milliseconds()))
return ok, err
}

// doWithTimeout wraps a function in a timeout, returning whether it succeeded before timing out.
// If the function returns an error within the timeout, it's considered as succeeded and the error will be returned back to the caller.
func doWithTimeout(timeout time.Duration, do func() error) (bool, error) {
done := make(chan error)
go func() {
done <- do()
}()

select {
case <-time.After(timeout):
return false, nil
case err := <-done:
return true, err
}
}
Loading

0 comments on commit 2ccd3d8

Please sign in to comment.