Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Add a way to export gauge metrics and other changes. #604

Merged
merged 2 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions metrics/dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,52 @@ func (d *Distribution) AddFloat64(f float64) {
// Add adds a distribution to the receiver distribution. If both distributions
// don't have the same buckets, an error is returned.
func (d *Distribution) Add(val Value) error {
_, err := d.addOrSubtract(val, false)
return err
}

// SubtractCounter subtracts the provided "lastVal", assuming that value
// represents a counter, i.e. if "value" is less than "lastVal", we assume that
// counter has been reset and don't subtract.
func (d *Distribution) SubtractCounter(lastVal Value) (bool, error) {
return d.addOrSubtract(lastVal, true)
}

func (d *Distribution) addOrSubtract(val Value, subtract bool) (bool, error) {
delta, ok := val.(*Distribution)
if !ok {
return errors.New("incompatible value to add to distribution")
return false, errors.New("dist: incompatible value to add or subtract")
}

if !reflect.DeepEqual(d.lowerBounds, delta.lowerBounds) {
return fmt.Errorf("incompatible delta value, Bucket lower bounds in receiver distribution: %v, and in delta distribution: %v", d.lowerBounds, delta.lowerBounds)
return false, fmt.Errorf("incompatible delta value, Bucket lower bounds in receiver distribution: %v, and in delta distribution: %v", d.lowerBounds, delta.lowerBounds)
}
d.mu.Lock()
defer d.mu.Unlock()
delta.mu.RLock()
defer delta.mu.RUnlock()

if subtract {
// If receiver count is less than lastVal' count, assume reset and return.
if d.count < delta.count {
return true, nil
}
d.count -= delta.count
d.sum -= delta.sum
} else {
d.count += delta.count
d.sum += delta.sum
}

for i := 0; i < len(d.bucketCounts); i++ {
d.bucketCounts[i] += delta.bucketCounts[i]
if subtract {
d.bucketCounts[i] -= delta.bucketCounts[i]
} else {
d.bucketCounts[i] += delta.bucketCounts[i]
}
}
d.count += delta.count
d.sum += delta.sum
return nil

return false, nil
}

// String returns a string representation of the distribution:
Expand Down
19 changes: 19 additions & 0 deletions metrics/dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,25 @@ func TestDistAdd(t *testing.T) {
verifyBucketCount(t, d, []int{0, 1, 2, 3, 4, 5}, []int64{1, 2, 0, 2, 0, 1})
}

func TestDistSubtractCounter(t *testing.T) {
lb := []float64{1, 5, 15, 30, 45}
d := NewDistribution(lb)

for _, s := range []float64{0.5, 4, 17} {
d.AddSample(s)
}

d2 := d.Clone().(*Distribution)
for _, s := range []float64{3.5, 21, 300} {
d2.AddSample(s)
}

if wasReset, err := d2.SubtractCounter(d); err != nil || wasReset {
t.Errorf("SubtractCounter error: %v, wasReset: %v", err, wasReset)
}
verifyBucketCount(t, d2, []int{0, 1, 2, 3, 4, 5}, []int64{0, 1, 0, 1, 0, 1})
}

func TestDistData(t *testing.T) {
lb := []float64{1, 5, 15, 30, 45}
d := NewDistribution(lb)
Expand Down
45 changes: 45 additions & 0 deletions metrics/eventmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,38 @@ func (em *EventMetrics) Update(in *EventMetrics) error {
}
}

// SubtractLast subtracts the provided (last) EventMetrics from the receiver
// EventMetrics and return the result as a GAUGE EventMetrics.
func (em *EventMetrics) SubtractLast(lastEM *EventMetrics) (*EventMetrics, error) {
if em.Kind != CUMULATIVE || lastEM.Kind != CUMULATIVE {
return nil, fmt.Errorf("incorrect eventmetrics kind (current: %v, last: %v), SubtractLast works only for CUMULATIVE metrics", em.Kind, lastEM.Kind)
}

gaugeEM := em.Clone()
gaugeEM.Kind = GAUGE

for name, lastVal := range lastEM.metrics {
val, ok := gaugeEM.metrics[name]
if !ok {
return nil, fmt.Errorf("receiver EventMetrics doesn't have %s metric", name)
}
wasReset, err := val.SubtractCounter(lastVal)
if err != nil {
return nil, err
}

// If any metric is reset, consider it a full reset of EventMetrics.
// TODO(manugarg): See if we can track this event somehow.
if wasReset {
gaugeEM := em.Clone()
gaugeEM.Kind = GAUGE
return gaugeEM, nil
}
}

return gaugeEM, nil
}

// String returns the string representation of the EventMetrics.
// Note that this is compatible with what vmwatcher understands.
// Example output string:
Expand Down Expand Up @@ -217,3 +249,16 @@ func (em *EventMetrics) String() string {
}
return b.String()
}

// Key returns a string key that uniquely identifies an eventmetrics.
func (em *EventMetrics) Key() string {
em.mu.RLock()
defer em.mu.RUnlock()

var keys []string
keys = append(keys, em.metricsKeys...)
for _, k := range em.LabelsKeys() {
keys = append(keys, k+"="+em.labels[k])
}
return strings.Join(keys, ",")
}
62 changes: 62 additions & 0 deletions metrics/eventmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,68 @@ func TestEventMetricsUpdate(t *testing.T) {
}
}

func TestEventMetricsSubtractCounters(t *testing.T) {
rttVal := NewInt(0)
rttVal.Str = func(i int64) string {
return fmt.Sprintf("%.3f", float64(i)/1000)
}
m := newEventMetrics(10, 10, 1000, make(map[string]int64))
m.AddLabel("ptype", "http")

// First run
m2 := newEventMetrics(32, 22, 220100, map[string]int64{
"200": 22,
})
gEM, err := m2.SubtractLast(m)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
verifyEventMetrics(t, gEM, 22, 12, 219100, map[string]int64{
"200": 22,
})

// Second run
m3 := newEventMetrics(42, 31, 300100, map[string]int64{
"200": 24,
"204": 8,
})

gEM, err = m3.SubtractLast(m2)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
verifyEventMetrics(t, gEM, 10, 9, 80000, map[string]int64{
"200": 2,
"204": 8,
})

// Third run, expect reset
m4 := newEventMetrics(10, 8, 1100, map[string]int64{
"200": 8,
})
gEM, err = m4.SubtractLast(m3)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
verifyEventMetrics(t, gEM, 10, 8, 1100, map[string]int64{
"200": 8,
})
}

func TestKey(t *testing.T) {
m := newEventMetrics(42, 31, 300100, map[string]int64{
"200": 24,
"204": 8,
}).AddLabel("probe", "google-homepage")

key := m.Key()
wantKey := "sent,rcvd,rtt,resp-code,probe=google-homepage"

if key != wantKey {
t.Errorf("Got key: %s, wanted: %s", key, wantKey)
}
}

func BenchmarkEventMetricsStringer(b *testing.B) {
em := newEventMetrics(32, 22, 220100, map[string]int64{
"200": 22,
Expand Down
16 changes: 16 additions & 0 deletions metrics/float.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ func (f *Float) Add(val Value) error {
return nil
}

// SubtractCounter subtracts the provided "lastVal", assuming that value
// represents a counter, i.e. if "value" is less than "lastVal", we assume that
// counter has been reset and don't subtract.
func (f *Float) SubtractCounter(lastVal Value) (bool, error) {
lv, ok := lastVal.(*Float)
if !ok {
return false, errors.New("incompatible value to add")
}
if f.f < lv.f {
return true, nil
}

f.f -= lv.f
return false, nil
}

// AddInt64 adds an int64 to the receiver Float.
func (f *Float) AddInt64(i int64) {
f.f += float64(i)
Expand Down
44 changes: 42 additions & 2 deletions metrics/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ func (i *Int) Add(val Value) error {
return nil
}

// SubtractCounter subtracts the provided "lastVal", assuming that value
// represents a counter, i.e. if "value" is less than "lastVal", we assume that
// counter has been reset and don't subtract.
func (i *Int) SubtractCounter(lastVal Value) (bool, error) {
lv, ok := lastVal.(*Int)
if !ok {
return false, errors.New("incompatible value to subtract")
}

if i.i < lv.i {
return true, nil
}

i.i -= lv.i
return false, nil
}

// AddInt64 adds an int64 to the receiver Int.
func (i *Int) AddInt64(ii int64) {
i.i += ii
Expand All @@ -93,8 +110,9 @@ func (i *Int) String() string {
return strconv.FormatInt(i.Int64(), 10)
}

// AtomicInt implements NumValue with int64 storage and atomic operations. If concurrency-safety
// is not a requirement, e.g. for use in already mutex protected map, you could use Int.
// AtomicInt implements NumValue with int64 storage and atomic operations. If
// concurrency-safety is not a requirement, e.g. for use in already mutex
// protected map, you could use Int.
type AtomicInt struct {
i int64
// If Str is defined, this is method used to convert AtomicInt into a string.
Expand Down Expand Up @@ -147,6 +165,28 @@ func (i *AtomicInt) Add(val Value) error {
return nil
}

// SubtractCounter subtracts the provided "lastVal". Note that this function
// is not fully atomic: we first load the values, compare them, and then update
// the receiver if required. There is a possibility that either receiver, or
// lastVal may change between loading of the values and updating them. We
// should still not get negative values though, as we use the snapshots to
// finally update the value.
func (i *AtomicInt) SubtractCounter(lastVal Value) (bool, error) {
lv, ok := lastVal.(NumValue)
if !ok {
return false, errors.New("incompatible value to subtract")
}

valS := i.Int64()
lvS := lv.Int64()

if valS < lvS {
return true, nil
}
atomic.StoreInt64(&i.i, valS-lvS)
return false, nil
}

// AddInt64 adds an int64 to the receiver Int.
func (i *AtomicInt) AddInt64(ii int64) {
atomic.AddInt64(&i.i, ii)
Expand Down
Loading