Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify tracking of metrics and events in CloudHandler #366

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Next
----
- Breaking internal metrics: This removes the `cloudprovider.items_queued` metric, and now tracks
the absolute number of hosts to look up, regardless of type.

34.0.1
------
- Forcing a new tag release to allow for docker release
Expand Down
4 changes: 1 addition & 3 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ Metrics:
| cloudprovider.cache_refresh_negative | gauge (cumulative) | | The cumulative number of refreshes which had an error refreshing and used old data
| cloudprovider.cache_hit | gauge (cumulative) | | The cumulative number of cache hits (host was in the cache)
| cloudprovider.cache_miss | gauge (cumulative) | | The cumulative number of cache misses
| cloudprovider.hosts_queued | gauge (flush) | type | The absolute number of hosts waiting to be looked up
| cloudprovider.items_queued | gauge (flush) | type | The absolute number of metrics or events waiting for a host lookup to complete
| cloudprovider.hosts_queued | gauge (flush) | | The absolute number of hosts waiting to be looked up
| http.forwarder.invalid | counter | | The number of failures to prepare a batch of metrics to forward
| http.forwarder.created | counter | | The number of batches prepared for forwarding
| http.forwarder.sent | counter | | The number of batches successfully forwarded
Expand All @@ -70,7 +69,6 @@ Metrics:
| version | The git tag of the build
| commit | The short git commit of the build
| backend | The backend sending a particular metric
| type | Either metric or event for cloudprovider.hosts_queued, or event for cloudprovider.items_queued
| result | Success to indicate a batch of metrics was successfully processed, failure to indicate a batch of metrics was not processed, with additional failure tag for why)
| failure | The reason a batch of metrics was not processed
| server-name | The name of an http-server as specified in the config file
Expand Down
2 changes: 1 addition & 1 deletion pkg/statsd/handler_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
for _, backend := range bh.backends {
select {
case <-ctx.Done():
// Not all backends got the event, should decrement the wg counter
// Not all backends got the event, should decrement the wg counter to account for it
bh.eventWg.Add(eventsDispatched - len(bh.backends))
return
case bh.concurrentEvents <- struct{}{}:
Expand Down
102 changes: 49 additions & 53 deletions pkg/statsd/handler_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ import (
"github.com/atlassian/gostatsd/pkg/stats"
)

type pendingMetricsAndEvents struct {
metrics *gostatsd.MetricMap
events []*gostatsd.Event
}

// CloudHandler enriches metrics and events with additional information fetched from cloud provider.
type CloudHandler struct {
// These fields are accessed by any go routine, must use atomic ops
statsCacheHit uint64 // Cumulative number of cache hits
statsCacheMiss uint64 // Cumulative number of cache misses

// All other stats fields may only be read or written by the main CloudHandler.Run goroutine
statsMetricHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for metrics
statsEventItemsQueued uint64 // Absolute number of events queued, waiting for a CP to respond
statsEventHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for events

cachedInstances gostatsd.CachedInstances
handler gostatsd.PipelineHandler
incomingMetrics chan *gostatsd.MetricMap
incomingEvents chan *gostatsd.Event

// emitChan triggers a write of all the current stats when it is given a Statser
emitChan chan stats.Statser
awaitingEvents map[gostatsd.Source][]*gostatsd.Event
awaitingMetrics map[gostatsd.Source]*gostatsd.MetricMap
emitChan chan stats.Statser

perHostPending map[gostatsd.Source]*pendingMetricsAndEvents
toLookupIPs []gostatsd.Source
wg sync.WaitGroup
wgPendingEvents sync.WaitGroup

estimatedTags int
}
Expand All @@ -45,8 +45,7 @@ func NewCloudHandler(cachedInstances gostatsd.CachedInstances, handler gostatsd.
incomingMetrics: make(chan *gostatsd.MetricMap),
incomingEvents: make(chan *gostatsd.Event),
emitChan: make(chan stats.Statser),
awaitingEvents: make(map[gostatsd.Source][]*gostatsd.Event),
awaitingMetrics: make(map[gostatsd.Source]*gostatsd.MetricMap),
perHostPending: make(map[gostatsd.Source]*pendingMetricsAndEvents),
estimatedTags: handler.EstimatedTags() + cachedInstances.EstimatedTags(),
}
}
Expand Down Expand Up @@ -105,17 +104,17 @@ func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) {
ch.handler.DispatchEvent(ctx, e)
return
}
ch.wg.Add(1) // Increment before sending to the channel
ch.wgPendingEvents.Add(1) // Increment before sending to the channel
select {
case <-ctx.Done():
ch.wg.Done()
ch.wgPendingEvents.Done()
case ch.incomingEvents <- e:
}
}

// WaitForEvents waits for all event-dispatching goroutines to finish.
func (ch *CloudHandler) WaitForEvents() {
ch.wg.Wait()
ch.wgPendingEvents.Wait()
ch.handler.WaitForEvents()
}

Expand Down Expand Up @@ -160,11 +159,8 @@ func (ch *CloudHandler) emit(statser stats.Statser) {
// atomic
statser.Gauge("cloudprovider.cache_hit", float64(atomic.LoadUint64(&ch.statsCacheHit)), nil)
statser.Gauge("cloudprovider.cache_miss", float64(atomic.LoadUint64(&ch.statsCacheMiss)), nil)
t := gostatsd.Tags{"type:metric"}
statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsMetricHostsQueued), t)
t = gostatsd.Tags{"type:event"}
statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsEventHostsQueued), t)
statser.Gauge("cloudprovider.items_queued", float64(ch.statsEventItemsQueued), t)
// non-atomic
statser.Gauge("cloudprovider.hosts_queued", float64(len(ch.perHostPending)), nil)
}

func (ch *CloudHandler) Run(ctx context.Context) {
Expand All @@ -184,10 +180,8 @@ func (ch *CloudHandler) Run(ctx context.Context) {
case info := <-infoSource:
ch.handleInstanceInfo(ctx, info)
case metrics := <-ch.incomingMetrics:
// Add metrics to awaitingMetrics, accumulate IPs to lookup
ch.handleIncomingMetrics(metrics)
case e := <-ch.incomingEvents:
// Add event to awaitingEvents, accumulate IPs to lookup
ch.handleIncomingEvent(e)
case statser := <-ch.emitChan:
ch.emit(statser)
Expand All @@ -203,38 +197,46 @@ func (ch *CloudHandler) Run(ctx context.Context) {
}

func (ch *CloudHandler) handleInstanceInfo(ctx context.Context, info gostatsd.InstanceInfo) {
mm := ch.awaitingMetrics[info.IP]
if mm != nil {
delete(ch.awaitingMetrics, info.IP)
ch.statsMetricHostsQueued--
go ch.updateAndDispatchMetrics(ctx, info.Instance, mm)
pending, ok := ch.perHostPending[info.IP]
if !ok {
return // got an instance for something we didn't request, ignore it.
}
events := ch.awaitingEvents[info.IP]
if len(events) > 0 {
delete(ch.awaitingEvents, info.IP)
ch.statsEventItemsQueued -= uint64(len(events))
ch.statsEventHostsQueued--
go ch.updateAndDispatchEvents(ctx, info.Instance, events)

delete(ch.perHostPending, info.IP)
if pending.metrics != nil {
go ch.updateAndDispatchMetrics(ctx, info.Instance, pending.metrics)
}
if len(pending.events) > 0 {
go ch.updateAndDispatchEvents(ctx, info.Instance, pending.events)
}
}

// prepareMetricQueue will ensure that ch.awaitingMetrics has a matching MetricMap for
// source, and return it. If it did not have one initially, it will also enqueue source
// for lookup. The functionality is overloaded to minimize code duplication.
func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap {
if queue, ok := ch.awaitingMetrics[source]; ok {
return queue
}
if len(ch.awaitingEvents[source]) == 0 {
// preparePending will return a place to queue things that are waiting to be processed,
// and ensure that source will be looked up if it wasn't already.
func (ch *CloudHandler) preparePending(source gostatsd.Source) *pendingMetricsAndEvents {
if _, ok := ch.perHostPending[source]; !ok {
ch.perHostPending[source] = &pendingMetricsAndEvents{}
ch.toLookupIPs = append(ch.toLookupIPs, source)
ch.statsMetricHostsQueued++
}
queue := gostatsd.NewMetricMap()
ch.awaitingMetrics[source] = queue
return queue
return ch.perHostPending[source]
}

// prepareMetricQueue will ensure that ch.perHostPending has a matching MetricMap for
// the provided source and return it.
func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap {
queue := ch.preparePending(source)
if queue.metrics == nil {
// There might be value in pushing this to preparePending, since the split is
// really only beneficial if a host is only sending events and not metrics, and
// this adds an extra comparison to every lookup.
queue.metrics = gostatsd.NewMetricMap()
}
return queue.metrics
}

func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) {
// The <metric>.Source values could be from different hosts if they were
// forwarded, therefore we need to do a lookup each time.
mm.Counters.Each(func(metricName string, tagsKey string, c gostatsd.Counter) {
ch.prepareMetricQueue(c.Source).MergeCounter(metricName, tagsKey, c)
})
Expand All @@ -250,14 +252,8 @@ func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) {
}

func (ch *CloudHandler) handleIncomingEvent(e *gostatsd.Event) {
queue := ch.awaitingEvents[e.Source]
ch.awaitingEvents[e.Source] = append(queue, e)
if len(queue) == 0 && ch.awaitingMetrics[e.Source] == nil {
// This is the first event for that IP in the queue. Need to fetch an Instance for this IP.
ch.toLookupIPs = append(ch.toLookupIPs, e.Source)
ch.statsEventHostsQueued++
}
ch.statsEventItemsQueued++
queue := ch.preparePending(e.Source)
queue.events = append(queue.events, e)
}

func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *gostatsd.Instance, mmIn *gostatsd.MetricMap) {
Expand All @@ -284,7 +280,7 @@ func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *
func (ch *CloudHandler) updateAndDispatchEvents(ctx context.Context, instance *gostatsd.Instance, events []*gostatsd.Event) {
var dispatched int
defer func() {
ch.wg.Add(-dispatched)
ch.wgPendingEvents.Add(-dispatched)
}()
for _, e := range events {
updateInplace(e, instance)
Expand Down