Skip to content

Commit

Permalink
Merge pull request #629 from atlassian/mgs/improve-forwarder-metrics
Browse files Browse the repository at this point in the history
Feature: Improve internal observability when running in forwarder mode
  • Loading branch information
MovieStoreGuy authored Jul 31, 2023
2 parents 9d285b3 + e0ae292 commit 31ebc74
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 89 deletions.
96 changes: 48 additions & 48 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,58 @@ Metric types:
| gauge (cumulative) | An internal counter sent as a gauge with the value never resetting
| gauge (sparse) | The same as a cumulative gauge, but data is only sent on change
| counter | An internal counter, reset on flush
| reported | The type will be emitted as a guage for services in stand alone mode, otherwise as counters for forwarders.


Metrics:

| Name | type | tags | description
| ------------------------------------------- | ------------------- | ---------------------------- | -----------
| aggregator.metricmaps_received | gauge (flush) | aggregator_id | The number of datapoint batches received during the flush interval
| aggregator.aggregation_time | gauge (time) | aggregator_id | The time taken (in ms) to aggregate all counter and timer
| | | | datapoints in this flush interval
| aggregator.process_time | gauge (time) | aggregator_id | The time taken to process all synchronous flush actions
| aggregator.reset_time | gauge (time) | aggregator_id | The time taken to reset the aggregator after flush
| parser.bad_lines_seen | gauge (sparse) | | The number of unparseable lines
| parser.events_received | gauge (cumulative) | | The number of events parsed
| parser.metrics_received | gauge (cumulative) | | The number of metrics parsed
| receiver.datagrams_received | gauge (cumulative) | | The number of datagrams received
| receiver.avg_datagrams_in_batch | gauge (flush) | | The average number of datagrams per batch (up to receive-batch-size). This
| | | | can be used to tweak receive-batch-size if necessary to reduce memory usage.
| channel.avg | gauge (flush) | channel | The average of all samples in the flush interval
| channel.min | gauge (flush) | channel | The minimum sample seen
| channel.max | gauge (flush) | channel | The maximum sample seen
| channel.last | gauge (flush) | channel | The last sample seen
| channel.capacity | gauge (flush) | channel | The capacity of the channel
| channel.samples | gauge (flush) | channel | The number of samples seen (guaranteed to be at least 1)
| heartbeat | gauge (flush) | version, commit | The value 1, tagged by the version (git tag) and short commit hash
| flusher.total_time | gauge (time) | | Time taken to flush all metrics to all backends for the flush interval
| backend.created | gauge (cumulative) | backend | Lifetime number of metric batches generated by the backend
| backend.create.failed | gauge (cumulative) | backend | Lifetime number of metric batches which failed to be serialized (DATALOSS!)
| backend.retried | gauge (sparse) | backend | Lifetime number of metric batches retried by the backend
| backend.dropped | gauge (cumulative) | backend | Lifetime number of metric batches dropped by the backend (DATALOSS!)
| backend.sent | gauge (cumulative) | backend | Lifetime number of metric batches successfully transmitted
| backend.series.sent | gauge (cumulative) | backend | Lifetime number of metric series successfully transmitted
| cloudprovider.aws.describeinstancecount | gauge (cumulative) | | The cumulative number of times DescribeInstancesPages has been called
| cloudprovider.aws.describeinstanceinstances | gauge (cumulative) | | The cumulative number of instances which have been fed in to DescribeInstancesPages
| cloudprovider.aws.describeinstancepages | gauge (cumulative) | | The cumulative number of pages from DescribeInstancesPages
| cloudprovider.aws.describeinstanceerrors | gauge (cumulative) | | The cumulative number of errors seen from DescribeInstancesPages
| cloudprovider.aws.describeinstancefound | gauge (cumulative) | | The cumulative number of instances successfully found via DescribeInstances
| cloudprovider.cache_positive | gauge (flush) | | The absolute number of positive entries in the cache
| cloudprovider.cache_negative | gauge (flush) | | The absolute number of negative entries in the cache
| cloudprovider.cache_refresh_positive | gauge (cumulative) | | The cumulative number of positive refreshes
| cloudprovider.cache_refresh_negative | gauge (cumulative) | | The cumulative number of refreshes which had an error refreshing and used old data
| cloudprovider.cache_hit | gauge (cumulative) | | The cumulative number of cache hits (host was in the cache)
| cloudprovider.cache_miss | gauge (cumulative) | | The cumulative number of cache misses
| cloudprovider.hosts_queued | gauge (flush) | type | The absolute number of hosts waiting to be looked up
| cloudprovider.items_queued | gauge (flush) | type | The absolute number of metrics or events waiting for a host lookup to complete
| http.forwarder.invalid | counter | | The number of failures to prepare a batch of metrics to forward
| http.forwarder.created | counter | | The number of batches prepared for forwarding
| http.forwarder.sent | counter | | The number of batches successfully forwarded
| http.forwarder.retried | counter | | The number of retries sending a batch
| http.forwarder.dropped | counter | | The number of batches dropped due to inability to forward upstream
| http.incoming | counter | server-name, result, failure | The number of batches forwarded to the server, and the results of processing them
| http.incoming.metrics | counter | server-name | The number of metrics received over http

| Name | type | tags | description
| ------------------------------------------- | --------------------- | ---------------------------- | -----------
| aggregator.metricmaps_received | gauge (flush) | aggregator_id | The number of datapoint batches received during the flush interval
| aggregator.aggregation_time | gauge (time) | aggregator_id | The time taken (in ms) to aggregate all counter and timer
| | | | datapoints in this flush interval
| aggregator.process_time | gauge (time) | aggregator_id | The time taken to process all synchronous flush actions
| aggregator.reset_time | gauge (time) | aggregator_id | The time taken to reset the aggregator after flush
| parser.bad_lines_seen | gauge (sparse) | | The number of unparseable lines
| parser.events_received | reported | | The number of events parsed
| parser.metrics_received | reported | | The number of metrics parsed
| receiver.datagrams_received | reported | | The number of datagrams received.
| receiver.batches_read | reported | | The number of batches read.
| channel.avg | gauge (flush) | channel | The average of all samples in the flush interval
| channel.min | gauge (flush) | channel | The minimum sample seen
| channel.max | gauge (flush) | channel | The maximum sample seen
| channel.last | gauge (flush) | channel | The last sample seen
| channel.capacity | gauge (flush) | channel | The capacity of the channel
| channel.samples | gauge (flush) | channel | The number of samples seen (guaranteed to be at least 1)
| heartbeat | reported (flush) | version, commit | The value 1, tagged by the version (git tag) and short commit hash
| flusher.total_time | gauge (time) | | Time taken to flush all metrics to all backends for the flush interval
| backend.created | gauge (cumulative) | backend | Lifetime number of metric batches generated by the backend
| backend.create.failed | gauge (cumulative) | backend | Lifetime number of metric batches which failed to be serialized (DATALOSS!)
| backend.retried | gauge (sparse) | backend | Lifetime number of metric batches retried by the backend
| backend.dropped | gauge (cumulative) | backend | Lifetime number of metric batches dropped by the backend (DATALOSS!)
| backend.sent | gauge (cumulative) | backend | Lifetime number of metric batches successfully transmitted
| backend.series.sent | gauge (cumulative) | backend | Lifetime number of metric series successfully transmitted
| cloudprovider.aws.describeinstancecount | gauge (cumulative) | | The cumulative number of times DescribeInstancesPages has been called
| cloudprovider.aws.describeinstanceinstances | gauge (cumulative) | | The cumulative number of instances which have been fed in to DescribeInstancesPages
| cloudprovider.aws.describeinstancepages | gauge (cumulative) | | The cumulative number of pages from DescribeInstancesPages
| cloudprovider.aws.describeinstanceerrors | gauge (cumulative) | | The cumulative number of errors seen from DescribeInstancesPages
| cloudprovider.aws.describeinstancefound | gauge (cumulative) | | The cumulative number of instances successfully found via DescribeInstances
| cloudprovider.cache_positive | gauge (flush) | | The absolute number of positive entries in the cache
| cloudprovider.cache_negative | gauge (flush) | | The absolute number of negative entries in the cache
| cloudprovider.cache_refresh_positive | gauge (cumulative) | | The cumulative number of positive refreshes
| cloudprovider.cache_refresh_negative | gauge (cumulative) | | The cumulative number of refreshes which had an error refreshing and used old data
| cloudprovider.cache_hit | gauge (cumulative) | | The cumulative number of cache hits (host was in the cache)
| cloudprovider.cache_miss | gauge (cumulative) | | The cumulative number of cache misses
| cloudprovider.hosts_queued | gauge (flush) | type | The absolute number of hosts waiting to be looked up
| cloudprovider.items_queued | gauge (flush) | type | The absolute number of metrics or events waiting for a host lookup to complete
| http.forwarder.invalid | counter | | The number of failures to prepare a batch of metrics to forward
| http.forwarder.created | counter | | The number of batches prepared for forwarding
| http.forwarder.sent | counter | | The number of batches successfully forwarded
| http.forwarder.retried | counter | | The number of retries sending a batch
| http.forwarder.dropped | counter | | The number of batches dropped due to inability to forward upstream
| http.incoming | counter | server-name, result, failure | The number of batches forwarded to the server, and the results of processing them
| http.incoming.metrics | counter | server-name | The number of metrics received over http

| Tag | Description
| ------------- | -----------
| aggregator_id | The index of an aggregator, the amount corresponds to the --max-workers flag
Expand Down
1 change: 1 addition & 0 deletions cmd/gostatsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func constructServer(v *viper.Viper) (*statsd.Server, error) {
DisabledSubTypes: gostatsd.DisabledSubMetrics(v),
BadLineRateLimitPerSecond: rate.Limit(v.GetFloat64(gostatsd.ParamBadLinesPerMinute) / 60.0),
HistogramLimit: v.GetUint32(gostatsd.ParamTimerHistogramLimit),
DisableInternalEvents: v.GetBool(gostatsd.ParamDisableInternalEvents),
Viper: v,
TransportPool: pool,
}, nil
Expand Down
5 changes: 5 additions & 0 deletions defaults_and_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const (
DefaultTimerHistogramLimit = math.MaxUint32
// DefaultLogRawMetric is the default value for whether to log the metrics received from network
DefaultLogRawMetric = false
// DefaultDisableInternalEvents is the default value for disabling internal events being sent
DefaultDisableInternalEvents = false
)

const (
Expand Down Expand Up @@ -172,6 +174,8 @@ const (
ParamTimerHistogramLimit = "timer-histogram-limit"
// ParamLogRawMetric enables custom metrics to be printed to stdout
ParamLogRawMetric = "log-raw-metric"
// ParamDisableInternalEvents enables sending internal events from gostatsd
ParamDisableInternalEvents = "disable-internal-events"
)

// AddFlags adds flags to the specified FlagSet.
Expand Down Expand Up @@ -213,6 +217,7 @@ func AddFlags(fs *pflag.FlagSet) {
fs.String(ParamHostname, getHost(), "overrides the hostname of the server")
fs.Uint32(ParamTimerHistogramLimit, DefaultTimerHistogramLimit, "upper limit of timer histogram buckets (MaxUint32 by default)")
fs.Bool(ParamLogRawMetric, DefaultLogRawMetric, "Print metrics received from network to stdout in JSON format")
fs.Bool(ParamDisableInternalEvents, DefaultDisableInternalEvents, "Disables sending internal events from gostatsd")
}

func minInt(a, b int) int {
Expand Down
13 changes: 9 additions & 4 deletions pkg/stats/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
)

type countingStatser struct {
gauges uint64
counters uint64
timers uint64
events uint64
gauges uint64
counters uint64
reporters uint64
timers uint64
events uint64
}

func (cs *countingStatser) NotifyFlush(ctx context.Context, d time.Duration) {}
Expand All @@ -33,6 +34,10 @@ func (cs *countingStatser) Increment(name string, tags gostatsd.Tags) {
atomic.AddUint64(&cs.counters, 1)
}

func (cs *countingStatser) Report(name string, value *uint64, tags gostatsd.Tags) {
atomic.AddUint64(&cs.reporters, 1)
}

func (cs *countingStatser) TimingMS(name string, ms float64, tags gostatsd.Tags) {
atomic.AddUint64(&cs.timers, 1)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/stats/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ func (hb *HeartBeater) Run(ctx context.Context) {
}

func (hb *HeartBeater) emit(statser Statser) {
statser.Gauge(hb.metricName, 1, nil)
val := uint64(1)
statser.Report(hb.metricName, &val, nil)
}
1 change: 1 addition & 0 deletions pkg/stats/statser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Statser interface {
Gauge(name string, value float64, tags gostatsd.Tags)
Count(name string, amount float64, tags gostatsd.Tags)
Increment(name string, tags gostatsd.Tags)
Report(name string, value *uint64, tags gostatsd.Tags)
TimingMS(name string, ms float64, tags gostatsd.Tags)
TimingDuration(name string, d time.Duration, tags gostatsd.Tags)
NewTimer(name string, tags gostatsd.Tags) *Timer
Expand Down
36 changes: 31 additions & 5 deletions pkg/stats/statser_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stats

import (
"context"
"sync/atomic"
"time"

"github.com/atlassian/gostatsd"
Expand All @@ -23,20 +24,32 @@ type InternalStatser struct {
hostname gostatsd.Source
handler gostatsd.PipelineHandler

disableEvents bool
forwarderMode bool

consolidator *gostatsd.MetricConsolidator
}

// NewInternalStatser creates a new Statser which sends metrics to the
// supplied InternalHandler.
func NewInternalStatser(tags gostatsd.Tags, namespace string, hostname gostatsd.Source, handler gostatsd.PipelineHandler) *InternalStatser {
func NewInternalStatser(
tags gostatsd.Tags,
namespace string,
hostname gostatsd.Source,
handler gostatsd.PipelineHandler,
disableEvents bool,
forwarderMode bool,
) *InternalStatser {
if hostname != gostatsd.UnknownSource {
tags = tags.Concat(gostatsd.Tags{"host:" + string(hostname)})
}
return &InternalStatser{
tags: tags,
namespace: namespace,
hostname: hostname,
handler: handler,
tags: tags,
namespace: namespace,
hostname: hostname,
handler: handler,
disableEvents: disableEvents,
forwarderMode: forwarderMode,
// We can't just use a MetricMap because everything
// that writes to it is on its own goroutine.
consolidator: gostatsd.NewMetricConsolidator(10, false, 0, nil),
Expand Down Expand Up @@ -86,6 +99,16 @@ func (is *InternalStatser) Increment(name string, tags gostatsd.Tags) {
is.Count(name, 1, tags)
}

func (is *InternalStatser) Report(name string, value *uint64, tags gostatsd.Tags) {
if is.forwarderMode {
val := float64(atomic.SwapUint64(value, 0))
is.Count(name, val, tags)
} else {
val := float64(atomic.LoadUint64(value))
is.Gauge(name, val, tags)
}
}

// TimingMS sends a timing metric from a millisecond value
func (is *InternalStatser) TimingMS(name string, ms float64, tags gostatsd.Tags) {
c := &gostatsd.Metric{
Expand Down Expand Up @@ -124,6 +147,9 @@ func (is *InternalStatser) dispatchMetric(metric *gostatsd.Metric) {
}

func (is *InternalStatser) Event(ctx context.Context, e *gostatsd.Event) {
if is.disableEvents {
return
}
is.handler.DispatchEvent(ctx, e)
}

Expand Down
Loading

0 comments on commit 31ebc74

Please sign in to comment.