diff --git a/metrics/dist.go b/metrics/dist.go index 53f0e19d..e36f2352 100644 --- a/metrics/dist.go +++ b/metrics/dist.go @@ -123,25 +123,52 @@ func (d *Distribution) AddFloat64(f float64) { // Add adds a distribution to the receiver distribution. If both distributions // don't have the same buckets, an error is returned. func (d *Distribution) Add(val Value) error { + _, err := d.addOrSubtract(val, false) + return err +} + +// SubtractCounter subtracts the provided "lastVal", assuming that value +// represents a counter, i.e. if "value" is less than "lastVal", we assume that +// counter has been reset and don't subtract. +func (d *Distribution) SubtractCounter(lastVal Value) (bool, error) { + return d.addOrSubtract(lastVal, true) +} + +func (d *Distribution) addOrSubtract(val Value, subtract bool) (bool, error) { delta, ok := val.(*Distribution) if !ok { - return errors.New("incompatible value to add to distribution") + return false, errors.New("dist: incompatible value to add or subtract") } if !reflect.DeepEqual(d.lowerBounds, delta.lowerBounds) { - return fmt.Errorf("incompatible delta value, Bucket lower bounds in receiver distribution: %v, and in delta distribution: %v", d.lowerBounds, delta.lowerBounds) + return false, fmt.Errorf("incompatible delta value, Bucket lower bounds in receiver distribution: %v, and in delta distribution: %v", d.lowerBounds, delta.lowerBounds) } d.mu.Lock() defer d.mu.Unlock() delta.mu.RLock() defer delta.mu.RUnlock() + if subtract { + // If receiver count is less than lastVal' count, assume reset and return. + if d.count < delta.count { + return true, nil + } + d.count -= delta.count + d.sum -= delta.sum + } else { + d.count += delta.count + d.sum += delta.sum + } + for i := 0; i < len(d.bucketCounts); i++ { - d.bucketCounts[i] += delta.bucketCounts[i] + if subtract { + d.bucketCounts[i] -= delta.bucketCounts[i] + } else { + d.bucketCounts[i] += delta.bucketCounts[i] + } } - d.count += delta.count - d.sum += delta.sum - return nil + + return false, nil } // String returns a string representation of the distribution: diff --git a/metrics/dist_test.go b/metrics/dist_test.go index c8008da8..58ebb438 100644 --- a/metrics/dist_test.go +++ b/metrics/dist_test.go @@ -139,6 +139,25 @@ func TestDistAdd(t *testing.T) { verifyBucketCount(t, d, []int{0, 1, 2, 3, 4, 5}, []int64{1, 2, 0, 2, 0, 1}) } +func TestDistSubtractCounter(t *testing.T) { + lb := []float64{1, 5, 15, 30, 45} + d := NewDistribution(lb) + + for _, s := range []float64{0.5, 4, 17} { + d.AddSample(s) + } + + d2 := d.Clone().(*Distribution) + for _, s := range []float64{3.5, 21, 300} { + d2.AddSample(s) + } + + if wasReset, err := d2.SubtractCounter(d); err != nil || wasReset { + t.Errorf("SubtractCounter error: %v, wasReset: %v", err, wasReset) + } + verifyBucketCount(t, d2, []int{0, 1, 2, 3, 4, 5}, []int64{0, 1, 0, 1, 0, 1}) +} + func TestDistData(t *testing.T) { lb := []float64{1, 5, 15, 30, 45} d := NewDistribution(lb) diff --git a/metrics/eventmetrics.go b/metrics/eventmetrics.go index aa689f8a..e31a3945 100644 --- a/metrics/eventmetrics.go +++ b/metrics/eventmetrics.go @@ -186,6 +186,38 @@ func (em *EventMetrics) Update(in *EventMetrics) error { } } +// SubtractLast subtracts the provided (last) EventMetrics from the receiver +// EventMetrics and return the result as a GAUGE EventMetrics. +func (em *EventMetrics) SubtractLast(lastEM *EventMetrics) (*EventMetrics, error) { + if em.Kind != CUMULATIVE || lastEM.Kind != CUMULATIVE { + return nil, fmt.Errorf("incorrect eventmetrics kind (current: %v, last: %v), SubtractLast works only for CUMULATIVE metrics", em.Kind, lastEM.Kind) + } + + gaugeEM := em.Clone() + gaugeEM.Kind = GAUGE + + for name, lastVal := range lastEM.metrics { + val, ok := gaugeEM.metrics[name] + if !ok { + return nil, fmt.Errorf("receiver EventMetrics doesn't have %s metric", name) + } + wasReset, err := val.SubtractCounter(lastVal) + if err != nil { + return nil, err + } + + // If any metric is reset, consider it a full reset of EventMetrics. + // TODO(manugarg): See if we can track this event somehow. + if wasReset { + gaugeEM := em.Clone() + gaugeEM.Kind = GAUGE + return gaugeEM, nil + } + } + + return gaugeEM, nil +} + // String returns the string representation of the EventMetrics. // Note that this is compatible with what vmwatcher understands. // Example output string: @@ -217,3 +249,16 @@ func (em *EventMetrics) String() string { } return b.String() } + +// Key returns a string key that uniquely identifies an eventmetrics. +func (em *EventMetrics) Key() string { + em.mu.RLock() + defer em.mu.RUnlock() + + var keys []string + keys = append(keys, em.metricsKeys...) + for _, k := range em.LabelsKeys() { + keys = append(keys, k+"="+em.labels[k]) + } + return strings.Join(keys, ",") +} diff --git a/metrics/eventmetrics_test.go b/metrics/eventmetrics_test.go index 1da6cf03..1b5c5aea 100644 --- a/metrics/eventmetrics_test.go +++ b/metrics/eventmetrics_test.go @@ -113,6 +113,68 @@ func TestEventMetricsUpdate(t *testing.T) { } } +func TestEventMetricsSubtractCounters(t *testing.T) { + rttVal := NewInt(0) + rttVal.Str = func(i int64) string { + return fmt.Sprintf("%.3f", float64(i)/1000) + } + m := newEventMetrics(10, 10, 1000, make(map[string]int64)) + m.AddLabel("ptype", "http") + + // First run + m2 := newEventMetrics(32, 22, 220100, map[string]int64{ + "200": 22, + }) + gEM, err := m2.SubtractLast(m) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + verifyEventMetrics(t, gEM, 22, 12, 219100, map[string]int64{ + "200": 22, + }) + + // Second run + m3 := newEventMetrics(42, 31, 300100, map[string]int64{ + "200": 24, + "204": 8, + }) + + gEM, err = m3.SubtractLast(m2) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + verifyEventMetrics(t, gEM, 10, 9, 80000, map[string]int64{ + "200": 2, + "204": 8, + }) + + // Third run, expect reset + m4 := newEventMetrics(10, 8, 1100, map[string]int64{ + "200": 8, + }) + gEM, err = m4.SubtractLast(m3) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + verifyEventMetrics(t, gEM, 10, 8, 1100, map[string]int64{ + "200": 8, + }) +} + +func TestKey(t *testing.T) { + m := newEventMetrics(42, 31, 300100, map[string]int64{ + "200": 24, + "204": 8, + }).AddLabel("probe", "google-homepage") + + key := m.Key() + wantKey := "sent,rcvd,rtt,resp-code,probe=google-homepage" + + if key != wantKey { + t.Errorf("Got key: %s, wanted: %s", key, wantKey) + } +} + func BenchmarkEventMetricsStringer(b *testing.B) { em := newEventMetrics(32, 22, 220100, map[string]int64{ "200": 22, diff --git a/metrics/float.go b/metrics/float.go index 042bbfd5..87fbbf98 100644 --- a/metrics/float.go +++ b/metrics/float.go @@ -73,6 +73,22 @@ func (f *Float) Add(val Value) error { return nil } +// SubtractCounter subtracts the provided "lastVal", assuming that value +// represents a counter, i.e. if "value" is less than "lastVal", we assume that +// counter has been reset and don't subtract. +func (f *Float) SubtractCounter(lastVal Value) (bool, error) { + lv, ok := lastVal.(*Float) + if !ok { + return false, errors.New("incompatible value to add") + } + if f.f < lv.f { + return true, nil + } + + f.f -= lv.f + return false, nil +} + // AddInt64 adds an int64 to the receiver Float. func (f *Float) AddInt64(i int64) { f.f += float64(i) diff --git a/metrics/int.go b/metrics/int.go index cd2900c8..fcb89c67 100644 --- a/metrics/int.go +++ b/metrics/int.go @@ -74,6 +74,23 @@ func (i *Int) Add(val Value) error { return nil } +// SubtractCounter subtracts the provided "lastVal", assuming that value +// represents a counter, i.e. if "value" is less than "lastVal", we assume that +// counter has been reset and don't subtract. +func (i *Int) SubtractCounter(lastVal Value) (bool, error) { + lv, ok := lastVal.(*Int) + if !ok { + return false, errors.New("incompatible value to subtract") + } + + if i.i < lv.i { + return true, nil + } + + i.i -= lv.i + return false, nil +} + // AddInt64 adds an int64 to the receiver Int. func (i *Int) AddInt64(ii int64) { i.i += ii @@ -93,8 +110,9 @@ func (i *Int) String() string { return strconv.FormatInt(i.Int64(), 10) } -// AtomicInt implements NumValue with int64 storage and atomic operations. If concurrency-safety -// is not a requirement, e.g. for use in already mutex protected map, you could use Int. +// AtomicInt implements NumValue with int64 storage and atomic operations. If +// concurrency-safety is not a requirement, e.g. for use in already mutex +// protected map, you could use Int. type AtomicInt struct { i int64 // If Str is defined, this is method used to convert AtomicInt into a string. @@ -147,6 +165,28 @@ func (i *AtomicInt) Add(val Value) error { return nil } +// SubtractCounter subtracts the provided "lastVal". Note that this function +// is not fully atomic: we first load the values, compare them, and then update +// the receiver if required. There is a possibility that either receiver, or +// lastVal may change between loading of the values and updating them. We +// should still not get negative values though, as we use the snapshots to +// finally update the value. +func (i *AtomicInt) SubtractCounter(lastVal Value) (bool, error) { + lv, ok := lastVal.(NumValue) + if !ok { + return false, errors.New("incompatible value to subtract") + } + + valS := i.Int64() + lvS := lv.Int64() + + if valS < lvS { + return true, nil + } + atomic.StoreInt64(&i.i, valS-lvS) + return false, nil +} + // AddInt64 adds an int64 to the receiver Int. func (i *AtomicInt) AddInt64(ii int64) { atomic.AddInt64(&i.i, ii) diff --git a/metrics/map.go b/metrics/map.go index 73392619..e57defa9 100644 --- a/metrics/map.go +++ b/metrics/map.go @@ -23,8 +23,8 @@ import ( "sync" ) -// Map implements a key-value store where keys are of type string and values are -// of type NumValue. +// Map implements a key-value store where keys are of type string and values +// are of type NumValue. // It satisfies the Value interface. type Map struct { MapName string // Map key name @@ -32,6 +32,9 @@ type Map struct { m map[string]NumValue keys []string + // total is only used to figure out if counter is moving up or down (reset). + total NumValue + // We use this to initialize new keys defaultKeyValue NumValue } @@ -42,12 +45,11 @@ func NewMap(mapName string, defaultValue NumValue) *Map { MapName: mapName, defaultKeyValue: defaultValue, m: make(map[string]NumValue), + total: defaultValue.Clone().(NumValue), } } // GetKey returns the given key's value. -// TODO(manugarg): We should probably add a way to get the list of all the keys in the -// map. func (m *Map) GetKey(key string) NumValue { m.mu.RLock() defer m.mu.RUnlock() @@ -63,6 +65,7 @@ func (m *Map) Clone() Value { MapName: m.MapName, defaultKeyValue: m.defaultKeyValue.Clone().(NumValue), m: make(map[string]NumValue), + total: m.total.Clone().(NumValue), } newMap.keys = make([]string, len(m.keys)) for i, k := range m.keys { @@ -86,6 +89,7 @@ func (m *Map) newKey(key string) { m.keys = append(m.keys, key) sort.Strings(m.keys) m.m[key] = m.defaultKeyValue.Clone().(NumValue) + m.total.IncBy(m.defaultKeyValue) } // IncKey increments the given key's value by one. @@ -96,6 +100,7 @@ func (m *Map) IncKey(key string) { m.newKey(key) } m.m[key].Inc() + m.total.Inc() } // IncKeyBy increments the given key's value by NumValue. @@ -106,33 +111,61 @@ func (m *Map) IncKeyBy(key string, delta NumValue) { m.newKey(key) } m.m[key].IncBy(delta) + m.total.IncBy(delta) } -// Add adds a value (type Value) to the receiver Map. A non-Map value returns an error. -// This is part of the Value interface. +// Add adds a value (type Value) to the receiver Map. A non-Map value returns +// an error. This is part of the Value interface. func (m *Map) Add(val Value) error { + _, err := m.addOrSubtract(val, false) + return err +} + +// SubtractCounter subtracts the provided "lastVal", assuming that value +// represents a counter, i.e. if "value" is less than "lastVal", we assume that +// counter has been reset and don't subtract. +func (m *Map) SubtractCounter(lastVal Value) (bool, error) { + return m.addOrSubtract(lastVal, true) +} + +func (m *Map) addOrSubtract(val Value, subtract bool) (bool, error) { delta, ok := val.(*Map) if !ok { - return errors.New("incompatible value to add") + return false, errors.New("incompatible value to add or subtract") } + m.mu.Lock() defer m.mu.Unlock() delta.mu.RLock() defer delta.mu.RUnlock() + + if subtract && (m.total.Float64() < delta.total.Float64()) { + return true, nil + } + var sortRequired bool for k, v := range delta.m { - if m.m[k] == nil { - sortRequired = true - m.keys = append(m.keys, k) - m.m[k] = v - continue + if subtract { + // If a key is there in delta (lastVal) but not in the current val, + // assume metric has been reset. + if m.m[k] == nil { + return true, nil + } + m.m[k].SubtractCounter(v) + } else { + if m.m[k] == nil { + sortRequired = true + m.keys = append(m.keys, k) + m.m[k] = v + continue + } + m.m[k].Add(v) } - m.m[k].Add(v) } if sortRequired { sort.Strings(m.keys) } - return nil + return false, nil } // AddInt64 generates a panic for the Map type. This is added only to satisfy diff --git a/metrics/map_test.go b/metrics/map_test.go index 0a83dbb3..2dbe4100 100644 --- a/metrics/map_test.go +++ b/metrics/map_test.go @@ -20,6 +20,8 @@ import ( ) func verify(t *testing.T, m *Map, expectedKeys []string, expectedMap map[string]int64) { + t.Helper() + if !reflect.DeepEqual(m.Keys(), expectedKeys) { t.Errorf("Map doesn't have expected keys. Got: %q, Expected: %q", m.Keys(), expectedKeys) } @@ -75,6 +77,47 @@ func TestMap(t *testing.T) { }) } +func TestMapSubtractCounter(t *testing.T) { + m1 := NewMap("code", NewInt(0)) + m1.IncKeyBy("200", NewInt(4000)) + m1.IncKeyBy("403", NewInt(2)) + + m2 := m1.Clone().(*Map) + m2.IncKeyBy("200", NewInt(400)) + m2.IncKey("500") + m2Clone := m2.Clone() // We'll use this for reset testing below. + + expectReset := false + wasReset, err := m2.SubtractCounter(m1) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if wasReset != expectReset { + t.Errorf("wasReset=%v, expected=%v", wasReset, expectReset) + } + verify(t, m2, []string{"200", "403", "500"}, map[string]int64{ + "200": 400, + "403": 0, + "500": 1, + }) + + // Expect a reset this time, as m3 (m) will be smaller than m2Clone. + m3 := m1.Clone() + expectReset = true + wasReset, err = m3.SubtractCounter(m2Clone) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if wasReset != expectReset { + t.Errorf("wasReset=%v, expected=%v", wasReset, expectReset) + } + verify(t, m3.(*Map), []string{"200", "403"}, map[string]int64{ + "200": 4000, + "403": 2, + }) + +} + func TestMapString(t *testing.T) { m := NewMap("lat", NewFloat(0)) m.IncKeyBy("p99", NewFloat(4000)) diff --git a/metrics/metrics.go b/metrics/metrics.go index 351e2b5b..f363a771 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -28,6 +28,11 @@ type Value interface { AddInt64(i int64) AddFloat64(f float64) String() string + + // SubtractCounter subtracts the provided "lastVal", assuming that value + // represents a counter, i.e. if "value" is less than the "lastVal", we + // assume that counter has been reset and don't subtract. + SubtractCounter(last Value) (wasReset bool, err error) } // NumValue represents any numerical metric value, e.g. Int, Float. diff --git a/metrics/string.go b/metrics/string.go index 13d440bf..40a4e25d 100644 --- a/metrics/string.go +++ b/metrics/string.go @@ -33,6 +33,12 @@ func (s String) Add(val Value) error { return errors.New("string value type doesn't support Add() operation") } +// SubtractCounter isn't supported for the String type, this is only to satisfy +// the Value interface. +func (s String) SubtractCounter(val Value) (bool, error) { + return false, errors.New("string value type doesn't support SubtractCounter() operation") +} + // AddInt64 generates a panic for the String type. This is added only to satisfy // the Value interface. func (s String) AddInt64(i int64) { diff --git a/surfacers/common/options/options.go b/surfacers/common/options/options.go index f16ddccd..99091741 100644 --- a/surfacers/common/options/options.go +++ b/surfacers/common/options/options.go @@ -19,6 +19,7 @@ import ( "fmt" "regexp" + "github.com/google/cloudprober/logger" "github.com/google/cloudprober/metrics" surfacerpb "github.com/google/cloudprober/surfacers/proto" ) @@ -65,11 +66,15 @@ func parseMetricsFilter(configs []*surfacerpb.LabelFilter) ([]*labelFilter, erro // Options encapsulates surfacer options common to all surfacers. type Options struct { MetricsBufferSize int + Config *surfacerpb.SurfacerDef + Logger *logger.Logger allowLabelFilters []*labelFilter ignoreLabelFilters []*labelFilter allowMetricName *regexp.Regexp ignoreMetricName *regexp.Regexp + + AddFailureMetric bool } // AllowEventMetrics returns whether a certain EventMetrics should be allowed @@ -120,8 +125,10 @@ func (opts *Options) AllowMetric(metricName string) bool { } // BuildOptionsFromConfig builds surfacer options using config. -func BuildOptionsFromConfig(sdef *surfacerpb.SurfacerDef) (*Options, error) { +func BuildOptionsFromConfig(sdef *surfacerpb.SurfacerDef, l *logger.Logger) (*Options, error) { opts := &Options{ + Config: sdef, + Logger: l, MetricsBufferSize: int(sdef.GetMetricsBufferSize()), } @@ -150,5 +157,10 @@ func BuildOptionsFromConfig(sdef *surfacerpb.SurfacerDef) (*Options, error) { } } + opts.AddFailureMetric = opts.Config.GetAddFailureMetric() + if opts.Config.AddFailureMetric == nil && opts.Config.GetType() == surfacerpb.Type_STACKDRIVER { + opts.AddFailureMetric = true + } + return opts, nil } diff --git a/surfacers/common/options/options_test.go b/surfacers/common/options/options_test.go index 18045a23..a068295a 100644 --- a/surfacers/common/options/options_test.go +++ b/surfacers/common/options/options_test.go @@ -97,7 +97,7 @@ func TestAllowEventMetrics(t *testing.T) { config.AllowMetricsWithLabel = append(config.AllowMetricsWithLabel, &configpb.LabelFilter{Key: proto.String(allowF[0]), Value: proto.String(allowF[1])}) } - opts, err := BuildOptionsFromConfig(config) + opts, err := BuildOptionsFromConfig(config, nil) if err != nil { if !test.wantErr { t.Errorf("Unexpected building options from the config: %v", err) @@ -170,7 +170,7 @@ func TestAllowMetric(t *testing.T) { AllowMetricsWithName: proto.String(test.allow), } - opts, err := BuildOptionsFromConfig(config) + opts, err := BuildOptionsFromConfig(config, nil) if err != nil { if !test.wantErr { t.Errorf("Unexpected building options from the config: %v", err) diff --git a/surfacers/common/transform/transform.go b/surfacers/common/transform/transform.go new file mode 100644 index 00000000..f9d88a64 --- /dev/null +++ b/surfacers/common/transform/transform.go @@ -0,0 +1,69 @@ +// Copyright 2021 The Cloudprober Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package transform implements some transformations for metrics before we +// export them. +package transform + +import ( + "fmt" + + "github.com/google/cloudprober/logger" + "github.com/google/cloudprober/metrics" +) + +// AddFailureMetric adds failure metric to the EventMetrics based on the +// config options. +func AddFailureMetric(em *metrics.EventMetrics, l *logger.Logger) { + tv, sv, fv := em.Metric("total"), em.Metric("success"), em.Metric("failure") + // If there is already a failure metric, or if "total" and "success" metrics + // are not available, don't compute failure metric. + if fv != nil || tv == nil || sv == nil { + return + } + + total, totalOk := tv.(metrics.NumValue) + success, successOk := sv.(metrics.NumValue) + if !totalOk || !successOk { + l.Errorf("total (%v) and success (%v) values are not numeric, this should never happen", tv, sv) + return + } + + em.AddMetric("failure", metrics.NewInt(total.Int64()-success.Int64())) +} + +// CumulativeToGauge creates a "gauge" EventMetrics from a "cumulative" +// EventMetrics using a cache. It looks for the EventMetrics in the given cache +// and if it exists already, it subtracts the current values from the cached +// values. +func CumulativeToGauge(em *metrics.EventMetrics, lvCache map[string]*metrics.EventMetrics, l *logger.Logger) (*metrics.EventMetrics, error) { + key := em.Key() + + lastEM, ok := lvCache[key] + // Cache a copy of "em" as some fields like maps and dist can be shared + // across successive "em" writes. + lvCache[key] = em.Clone() + + // If it is the first time for this EventMetrics, return it as it is. + if !ok { + return em, nil + } + + gaugeEM, err := em.SubtractLast(lastEM) + if err != nil { + return nil, fmt.Errorf("error subtracting cached metrics from current metrics: %v", err) + } + + return gaugeEM, nil +} diff --git a/surfacers/proto/config.pb.go b/surfacers/proto/config.pb.go index 52f29745..4c356be8 100644 --- a/surfacers/proto/config.pb.go +++ b/surfacers/proto/config.pb.go @@ -201,6 +201,17 @@ type SurfacerDef struct { // CLOUDWATCH, PROMETHEUS, STACKDRIVER AllowMetricsWithName *string `protobuf:"bytes,6,opt,name=allow_metrics_with_name,json=allowMetricsWithName" json:"allow_metrics_with_name,omitempty"` IgnoreMetricsWithName *string `protobuf:"bytes,7,opt,name=ignore_metrics_with_name,json=ignoreMetricsWithName" json:"ignore_metrics_with_name,omitempty"` + // Whether to add failure metric or not. For stackdriver surfacer, we add + // failure metric by default. + AddFailureMetric *bool `protobuf:"varint,8,opt,name=add_failure_metric,json=addFailureMetric" json:"add_failure_metric,omitempty"` + // If set to true, cloudprober will export all metrics as gauge metrics. Note + // that cloudprober inherently generates only cumulative metrics. To create + // gauge metrics from cumulative metrics, we keep a copy of the old metrics + // and subtract new metrics from the previous metrics. This transformation in + // metrics has an increased memory-overhead because extra copies required. + // However, it should not be noticeable unless you're producing large number + // of metrics (say > 10000 metrics per second). + ExportAsGauge *bool `protobuf:"varint,9,opt,name=export_as_gauge,json=exportAsGauge" json:"export_as_gauge,omitempty"` // Matching surfacer specific configuration (one for each type in the above // enum) // @@ -300,6 +311,20 @@ func (x *SurfacerDef) GetIgnoreMetricsWithName() string { return "" } +func (x *SurfacerDef) GetAddFailureMetric() bool { + if x != nil && x.AddFailureMetric != nil { + return *x.AddFailureMetric + } + return false +} + +func (x *SurfacerDef) GetExportAsGauge() bool { + if x != nil && x.ExportAsGauge != nil { + return *x.ExportAsGauge + } + return false +} + func (m *SurfacerDef) GetSurfacer() isSurfacerDef_Surfacer { if m != nil { return m.Surfacer @@ -426,7 +451,7 @@ var file_github_com_google_cloudprober_surfacers_proto_config_proto_rawDesc = [] 0x6f, 0x22, 0x35, 0x0a, 0x0b, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xe9, 0x07, 0x0a, 0x0b, 0x53, 0x75, 0x72, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xbf, 0x08, 0x0a, 0x0b, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x44, 0x65, 0x66, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1a, 0x2e, 0x63, 0x6c, 0x6f, @@ -453,53 +478,59 @@ var file_github_com_google_cloudprober_surfacers_proto_config_proto_rawDesc = [] 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x5f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x5f, 0x77, 0x69, 0x74, 0x68, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x69, 0x67, 0x6e, 0x6f, 0x72, 0x65, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x57, 0x69, 0x74, 0x68, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x60, 0x0a, - 0x13, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x5f, 0x73, 0x75, 0x72, 0x66, - 0x61, 0x63, 0x65, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x63, 0x6c, 0x6f, - 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, - 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2e, 0x53, 0x75, 0x72, - 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x12, 0x70, 0x72, 0x6f, - 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, - 0x63, 0x0a, 0x14, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x5f, 0x73, - 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2e, 0x2e, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, - 0x61, 0x63, 0x65, 0x72, 0x2e, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, - 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, - 0x13, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x53, 0x75, 0x72, 0x66, - 0x61, 0x63, 0x65, 0x72, 0x12, 0x4e, 0x0a, 0x0d, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x73, 0x75, 0x72, - 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, - 0x65, 0x72, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, - 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x53, 0x75, 0x72, 0x66, - 0x61, 0x63, 0x65, 0x72, 0x12, 0x5a, 0x0a, 0x11, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, - 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x2b, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, - 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x2e, - 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x10, - 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, - 0x12, 0x54, 0x0a, 0x0f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, - 0x63, 0x65, 0x72, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x6c, 0x6f, 0x75, + 0x74, 0x72, 0x69, 0x63, 0x73, 0x57, 0x69, 0x74, 0x68, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2c, 0x0a, + 0x12, 0x61, 0x64, 0x64, 0x5f, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x61, 0x64, 0x64, 0x46, 0x61, + 0x69, 0x6c, 0x75, 0x72, 0x65, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x12, 0x26, 0x0a, 0x0f, 0x65, + 0x78, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x61, 0x73, 0x5f, 0x67, 0x61, 0x75, 0x67, 0x65, 0x18, 0x09, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x65, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x41, 0x73, 0x47, 0x61, + 0x75, 0x67, 0x65, 0x12, 0x60, 0x0a, 0x13, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, + 0x73, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x2d, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, + 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, + 0x75, 0x73, 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, + 0x00, 0x52, 0x12, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x53, 0x75, 0x72, + 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, 0x63, 0x0a, 0x14, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x64, 0x72, + 0x69, 0x76, 0x65, 0x72, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0b, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, + 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x73, 0x74, 0x61, 0x63, 0x6b, + 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, + 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x13, 0x73, 0x74, 0x61, 0x63, 0x6b, 0x64, 0x72, 0x69, 0x76, + 0x65, 0x72, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, 0x4e, 0x0a, 0x0d, 0x66, 0x69, + 0x6c, 0x65, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0c, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x27, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, + 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x2e, 0x53, 0x75, + 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x0c, 0x66, 0x69, + 0x6c, 0x65, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, 0x5a, 0x0a, 0x11, 0x70, 0x6f, + 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, + 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, + 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x70, 0x6f, 0x73, + 0x74, 0x67, 0x72, 0x65, 0x73, 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, + 0x6e, 0x66, 0x48, 0x00, 0x52, 0x10, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x53, 0x75, + 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, 0x54, 0x0a, 0x0f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, + 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x29, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, + 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x53, 0x75, + 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x75, + 0x62, 0x73, 0x75, 0x62, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, 0x60, 0x0a, 0x13, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x77, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, + 0x63, 0x65, 0x72, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, - 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, - 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x0e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x53, 0x75, - 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x12, 0x60, 0x0a, 0x13, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x77, - 0x61, 0x74, 0x63, 0x68, 0x5f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x18, 0x0f, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, - 0x72, 0x2e, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, - 0x77, 0x61, 0x74, 0x63, 0x68, 0x2e, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, - 0x6e, 0x66, 0x48, 0x00, 0x52, 0x12, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x77, 0x61, 0x74, 0x63, 0x68, - 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x42, 0x0a, 0x0a, 0x08, 0x73, 0x75, 0x72, 0x66, - 0x61, 0x63, 0x65, 0x72, 0x2a, 0x77, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, - 0x4e, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x50, 0x52, 0x4f, 0x4d, 0x45, 0x54, - 0x48, 0x45, 0x55, 0x53, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x54, 0x41, 0x43, 0x4b, 0x44, - 0x52, 0x49, 0x56, 0x45, 0x52, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x46, 0x49, 0x4c, 0x45, 0x10, - 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x04, 0x12, - 0x0a, 0x0a, 0x06, 0x50, 0x55, 0x42, 0x53, 0x55, 0x42, 0x10, 0x05, 0x12, 0x0e, 0x0a, 0x0a, 0x43, - 0x4c, 0x4f, 0x55, 0x44, 0x57, 0x41, 0x54, 0x43, 0x48, 0x10, 0x06, 0x12, 0x10, 0x0a, 0x0c, 0x55, - 0x53, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x46, 0x49, 0x4e, 0x45, 0x44, 0x10, 0x63, 0x42, 0x2f, 0x5a, - 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x72, 0x2f, 0x73, - 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x77, 0x61, 0x74, 0x63, 0x68, 0x2e, 0x53, 0x75, 0x72, 0x66, + 0x61, 0x63, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x48, 0x00, 0x52, 0x12, 0x63, 0x6c, 0x6f, 0x75, + 0x64, 0x77, 0x61, 0x74, 0x63, 0x68, 0x53, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x42, 0x0a, + 0x0a, 0x08, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x2a, 0x77, 0x0a, 0x04, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x4f, 0x4e, 0x45, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, + 0x50, 0x52, 0x4f, 0x4d, 0x45, 0x54, 0x48, 0x45, 0x55, 0x53, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, + 0x53, 0x54, 0x41, 0x43, 0x4b, 0x44, 0x52, 0x49, 0x56, 0x45, 0x52, 0x10, 0x02, 0x12, 0x08, 0x0a, + 0x04, 0x46, 0x49, 0x4c, 0x45, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x4f, 0x53, 0x54, 0x47, + 0x52, 0x45, 0x53, 0x10, 0x04, 0x12, 0x0a, 0x0a, 0x06, 0x50, 0x55, 0x42, 0x53, 0x55, 0x42, 0x10, + 0x05, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x4c, 0x4f, 0x55, 0x44, 0x57, 0x41, 0x54, 0x43, 0x48, 0x10, + 0x06, 0x12, 0x10, 0x0a, 0x0c, 0x55, 0x53, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x46, 0x49, 0x4e, 0x45, + 0x44, 0x10, 0x63, 0x42, 0x2f, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, + 0x6f, 0x62, 0x65, 0x72, 0x2f, 0x73, 0x75, 0x72, 0x66, 0x61, 0x63, 0x65, 0x72, 0x73, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, } var ( diff --git a/surfacers/proto/config.proto b/surfacers/proto/config.proto index 309559a1..9dbcc22d 100644 --- a/surfacers/proto/config.proto +++ b/surfacers/proto/config.proto @@ -75,6 +75,19 @@ message SurfacerDef { optional string allow_metrics_with_name = 6; optional string ignore_metrics_with_name = 7; + // Whether to add failure metric or not. For stackdriver surfacer, we add + // failure metric by default. + optional bool add_failure_metric = 8; + + // If set to true, cloudprober will export all metrics as gauge metrics. Note + // that cloudprober inherently generates only cumulative metrics. To create + // gauge metrics from cumulative metrics, we keep a copy of the old metrics + // and subtract new metrics from the previous metrics. This transformation in + // metrics has an increased memory-overhead because extra copies required. + // However, it should not be noticeable unless you're producing large number + // of metrics (say > 10000 metrics per second). + optional bool export_as_gauge = 9; + // Matching surfacer specific configuration (one for each type in the above // enum) oneof surfacer { diff --git a/surfacers/surfacers.go b/surfacers/surfacers.go index f20c08ad..4d8a474b 100644 --- a/surfacers/surfacers.go +++ b/surfacers/surfacers.go @@ -34,6 +34,7 @@ import ( "github.com/google/cloudprober/metrics" "github.com/google/cloudprober/surfacers/cloudwatch" "github.com/google/cloudprober/surfacers/common/options" + "github.com/google/cloudprober/surfacers/common/transform" "github.com/google/cloudprober/surfacers/file" "github.com/google/cloudprober/surfacers/postgres" "github.com/google/cloudprober/surfacers/prometheus" @@ -95,13 +96,29 @@ type Surfacer interface { type surfacerWrapper struct { Surfacer - opts *options.Options + opts *options.Options + lvCache map[string]*metrics.EventMetrics } func (sw *surfacerWrapper) Write(ctx context.Context, em *metrics.EventMetrics) { - if sw.opts.AllowEventMetrics(em) { - sw.Surfacer.Write(ctx, em) + if !sw.opts.AllowEventMetrics(em) { + return } + + if sw.opts.AddFailureMetric { + transform.AddFailureMetric(em, sw.opts.Logger) + } + + if sw.opts.Config.GetExportAsGauge() && em.Kind == metrics.CUMULATIVE { + newEM, err := transform.CumulativeToGauge(em, sw.lvCache, sw.opts.Logger) + if err != nil { + sw.opts.Logger.Errorf("Error converting CUMULATIVE metrics to GAUGE: %v", err) + return + } + em = newEM + } + + sw.Surfacer.Write(ctx, em) } // SurfacerInfo encapsulates a Surfacer and related info. @@ -144,7 +161,7 @@ func initSurfacer(ctx context.Context, s *surfacerpb.SurfacerDef, sType surfacer return nil, nil, fmt.Errorf("unable to create cloud logger: %v", err) } - opts, err := options.BuildOptionsFromConfig(s) + opts, err := options.BuildOptionsFromConfig(s, l) if err != nil { return nil, nil, err } @@ -185,6 +202,7 @@ func initSurfacer(ctx context.Context, s *surfacerpb.SurfacerDef, sType surfacer return &surfacerWrapper{ Surfacer: surfacer, opts: opts, + lvCache: make(map[string]*metrics.EventMetrics), }, conf, err }