Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Provide a common way to filter metrics. #597

Merged
merged 2 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 9 additions & 33 deletions surfacers/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package cloudwatch
import (
"context"
"fmt"
"regexp"
"strconv"
"time"

Expand All @@ -42,33 +41,24 @@ const distributionDimensionName string = "le"

// CWSurfacer implements AWS Cloudwatch surfacer.
type CWSurfacer struct {
c *configpb.SurfacerConf
writeChan chan *metrics.EventMetrics
session *cloudwatch.CloudWatch
l *logger.Logger
allowedMetricsRegex *regexp.Regexp
c *configpb.SurfacerConf
opts *options.Options
writeChan chan *metrics.EventMetrics
session *cloudwatch.CloudWatch
l *logger.Logger

// A cache of []*cloudwatch.MetricDatum's, used for batch writing to the
// cloudwatch api.
cwMetricDatumCache []*cloudwatch.MetricDatum
}

func (cw *CWSurfacer) processIncomingMetrics(ctx context.Context) {
RoutineLoop:
for {
select {
case <-ctx.Done():
cw.l.Infof("Context canceled, stopping the surfacer write loop")
return
case em := <-cw.writeChan:

// evaluate if any of the metric labels are to be ignored and exit early
for _, label := range em.LabelsKeys() {
if cw.ignoreMetric(label) || cw.ignoreMetric(em.Label(label)) {
continue RoutineLoop
}
}

// check if a failure metric can be calculated
if em.Metric("success") != nil && em.Metric("total") != nil && em.Metric("failure") == nil {
if failure, err := calculateFailureMetric(em); err == nil {
Expand All @@ -88,6 +78,9 @@ RoutineLoop:
// each metric into a structure that is supported by Cloudwatch
func (cw *CWSurfacer) recordEventMetrics(em *metrics.EventMetrics) {
for _, metricKey := range em.MetricsKeys() {
if !cw.opts.AllowMetric(metricKey) {
continue
}

switch value := em.Metric(metricKey).(type) {
case metrics.NumValue:
Expand Down Expand Up @@ -152,16 +145,6 @@ func calculateFailureMetric(em *metrics.EventMetrics) (float64, error) {
return failure, nil
}

func (cw *CWSurfacer) ignoreMetric(name string) bool {
if cw.allowedMetricsRegex != nil {
if !cw.allowedMetricsRegex.MatchString(name) {
return true
}
}

return false
}

// Create a new cloudwatch metriddatum using the values passed in.
func (cw *CWSurfacer) newCWMetricDatum(metricname string, value float64, dimensions []*cloudwatch.Dimension, timestamp time.Time, latencyUnit time.Duration) *cloudwatch.MetricDatum {
// define the metric datum with default values
Expand Down Expand Up @@ -209,19 +192,12 @@ func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Optio

cw := &CWSurfacer{
c: config,
opts: opts,
writeChan: make(chan *metrics.EventMetrics, opts.MetricsBufferSize),
session: cloudwatch.New(sess),
l: l,
}

if cw.c.GetAllowedMetricsRegex() != "" {
r, err := regexp.Compile(cw.c.GetAllowedMetricsRegex())
if err != nil {
return nil, err
}
cw.allowedMetricsRegex = r
}

// Set the capacity of this slice to the max metric value, to avoid having to
// grow the slice.
cw.cwMetricDatumCache = make([]*cloudwatch.MetricDatum, 0, maxMetricDatums)
Expand Down
60 changes: 1 addition & 59 deletions surfacers/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cloudwatch
import (
"context"
"reflect"
"regexp"
"strings"
"testing"
"time"
Expand All @@ -35,8 +34,7 @@ func newTestCWSurfacer() CWSurfacer {
resolution := int64(60)

return CWSurfacer{
l: l,
allowedMetricsRegex: &regexp.Regexp{},
l: l,
c: &configpb.SurfacerConf{
Namespace: &namespace,
AllowedMetricsRegex: new(string),
Expand All @@ -45,62 +43,6 @@ func newTestCWSurfacer() CWSurfacer {
}
}

func TestIgnoreMetric(t *testing.T) {
tests := map[string]struct {
surfacer CWSurfacer
regex string
name string
want bool
}{
"regex default": {
surfacer: newTestCWSurfacer(),
regex: "",
name: "test",
want: false,
},
"regexp direct match": {
surfacer: newTestCWSurfacer(),
regex: "latency",
name: "latency",
want: false,
},
"regexp partial match inside optional": {
surfacer: newTestCWSurfacer(),
regex: ".*(http.*|ping).*",
name: "httphttp",
want: false,
},
"regex ignored": {
surfacer: newTestCWSurfacer(),
regex: ".*(http|ping).*",
name: "sysvar",
want: true,
},
"regex ignored partial": {
surfacer: newTestCWSurfacer(),
regex: ".*(http.*|ping).*",
name: "httsysvar",
want: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
r, err := regexp.Compile(tc.regex)
if err != nil {
t.Fatalf("Error compiling regex string: %s, error: %s", tc.regex, err)
}

tc.surfacer.allowedMetricsRegex = r

got := tc.surfacer.ignoreMetric(tc.name)
if got != tc.want {
t.Errorf("got: %t, want: %t", got, tc.want)
}
})
}
}

func TestEmLabelsToDimensions(t *testing.T) {
timestamp := time.Now()

Expand Down
133 changes: 133 additions & 0 deletions surfacers/common/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,140 @@
// Package options defines data structure for common surfacer options.
package options

import (
"fmt"
"regexp"

"github.com/google/cloudprober/metrics"
surfacerpb "github.com/google/cloudprober/surfacers/proto"
)

type labelFilter struct {
key string
value string
}

func (lf *labelFilter) matchEventMetrics(em *metrics.EventMetrics) bool {
if lf.key != "" {
for _, lKey := range em.LabelsKeys() {
if lf.key != lKey {
continue
}
if lf.value == "" {
return true
}
return lf.value == em.Label(lKey)
}
}
return false
}

func parseMetricsFilter(configs []*surfacerpb.LabelFilter) ([]*labelFilter, error) {
var filters []*labelFilter

for _, c := range configs {
lf := &labelFilter{
key: c.GetKey(),
value: c.GetValue(),
}

if lf.value != "" && lf.key == "" {
return nil, fmt.Errorf("key is required to match against val (%s)", c.GetValue())
}

filters = append(filters, lf)
}

return filters, nil
}

// Options encapsulates surfacer options common to all surfacers.
type Options struct {
MetricsBufferSize int

allowLabelFilters []*labelFilter
ignoreLabelFilters []*labelFilter
allowMetricName *regexp.Regexp
ignoreMetricName *regexp.Regexp
}

// AllowEventMetrics returns whether a certain EventMetrics should be allowed
// or not.
// TODO(manugarg): Explore if we can either log or increment some metric when
// we ignore an EventMetrics.
func (opts *Options) AllowEventMetrics(em *metrics.EventMetrics) bool {
if opts == nil {
return true
}

// If we match any ignore filter, return false immediately.
for _, ignoreF := range opts.ignoreLabelFilters {
if ignoreF.matchEventMetrics(em) {
return false
}
}

// If no allow filters are given, allow everything.
if len(opts.allowLabelFilters) == 0 {
return true
}

// If allow filters are given, allow only if match them.
for _, allowF := range opts.allowLabelFilters {
if allowF.matchEventMetrics(em) {
return true
}
}
return false
}

// AllowMetric returns whether a certain Metric should be allowed or not.
func (opts *Options) AllowMetric(metricName string) bool {
if opts == nil {
return true
}

if opts.ignoreMetricName != nil && opts.ignoreMetricName.MatchString(metricName) {
return false
}

if opts.allowMetricName == nil {
return true
}

return opts.allowMetricName.MatchString(metricName)
}

// BuildOptionsFromConfig builds surfacer options using config.
func BuildOptionsFromConfig(sdef *surfacerpb.SurfacerDef) (*Options, error) {
opts := &Options{
MetricsBufferSize: int(sdef.GetMetricsBufferSize()),
}

var err error
opts.allowLabelFilters, err = parseMetricsFilter(sdef.GetAllowMetricsWithLabel())
if err != nil {
return nil, err
}

opts.ignoreLabelFilters, err = parseMetricsFilter(sdef.GetIgnoreMetricsWithLabel())
if err != nil {
return nil, err
}

if sdef.GetAllowMetricsWithName() != "" {
opts.allowMetricName, err = regexp.Compile(sdef.GetAllowMetricsWithName())
if err != nil {
return nil, err
}
}

if sdef.GetIgnoreMetricsWithName() != "" {
opts.ignoreMetricName, err = regexp.Compile(sdef.GetIgnoreMetricsWithName())
if err != nil {
return nil, err
}
}

return opts, nil
}
Loading