Skip to content

Commit

Permalink
Merge pull request #11121 from hashicorp/backport/1.10.x/11090
Browse files Browse the repository at this point in the history
Backport #11090 into release/1.10.x
  • Loading branch information
clly authored Sep 23, 2021
2 parents c8405a1 + 2a27cf9 commit 6dcf27a
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/11090.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
telemetry: Add new metrics for the count of KV entries in the Consul store.
```
29 changes: 29 additions & 0 deletions agent/consul/state/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

const (
serviceNamesUsageTable = "service-names"
kvUsageTable = "kv-entries"
)

// usageTableSchema returns a new table schema used for tracking various indexes
Expand Down Expand Up @@ -46,6 +47,11 @@ type ServiceUsage struct {
EnterpriseServiceUsage
}

type KVUsage struct {
KVCount int
EnterpriseKVUsage
}

type uniqueServiceState int

const (
Expand Down Expand Up @@ -85,6 +91,9 @@ func updateUsage(tx WriteTxn, changes Changes) error {
} else {
serviceNameChanges[svc.CompoundServiceName()] += delta
}
case "kvs":
usageDeltas[change.Table] += delta
addEnterpriseKVUsage(usageDeltas, change)
}
}

Expand Down Expand Up @@ -246,6 +255,26 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
return serviceInstances.Index, results, nil
}

func (s *Store) KVUsage() (uint64, KVUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()

kvs, err := firstUsageEntry(tx, "kvs")
if err != nil {
return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err)
}

usage := KVUsage{
KVCount: kvs.Count,
}
results, err := compileEnterpriseKVUsage(tx, usage)
if err != nil {
return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err)
}

return kvs.Index, results, nil
}

func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
usage, err := tx.First("usage", "id", id)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions agent/consul/state/usage_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ import (

type EnterpriseServiceUsage struct{}

type EnterpriseKVUsage struct{}

func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {}

func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {}

func addEnterpriseKVUsage(map[string]int, memdb.Change) {}

func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
return usage, nil
}

func compileEnterpriseKVUsage(tx ReadTxn, usage KVUsage) (KVUsage, error) {
return usage, nil
}
38 changes: 38 additions & 0 deletions agent/consul/state/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,44 @@ func TestStateStore_Usage_NodeCount_Delete(t *testing.T) {
require.Equal(t, count, 1)
}

func TestStateStore_Usage_KVUsage(t *testing.T) {
s := testStateStore(t)

// No keys have been registered, and thus no usage entry exists
idx, usage, err := s.KVUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(0))
require.Equal(t, usage.KVCount, 0)

testSetKey(t, s, 0, "key-1", "0", nil)
testSetKey(t, s, 1, "key-2", "0", nil)
testSetKey(t, s, 2, "key-2", "1", nil)

idx, usage, err = s.KVUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.KVCount, 2)
}

func TestStateStore_Usage_KVUsage_Delete(t *testing.T) {
s := testStateStore(t)

testSetKey(t, s, 0, "key-1", "0", nil)
testSetKey(t, s, 1, "key-2", "0", nil)
testSetKey(t, s, 2, "key-2", "1", nil)

idx, usage, err := s.KVUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.KVCount, 2)

require.NoError(t, s.KVSDelete(3, "key-2", nil))
idx, usage, err = s.KVUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(3))
require.Equal(t, usage.KVCount, 1)
}

func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
s := testStateStore(t)

Expand Down
12 changes: 12 additions & 0 deletions agent/consul/usagemetrics/usagemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ var Gauges = []prometheus.GaugeDefinition{
Name: []string{"consul", "members", "servers"},
Help: "Measures the current number of server agents registered with Consul. It is only emitted by Consul servers. Added in v1.9.6.",
},
{
Name: []string{"consul", "kv", "entries"},
Help: "Measures the current number of server agents registered with Consul. It is only emitted by Consul servers. Added in v1.10.3.",
},
}

type getMembersFunc func() []serf.Member
Expand Down Expand Up @@ -144,6 +148,7 @@ func (u *UsageMetricsReporter) Run(ctx context.Context) {
}

func (u *UsageMetricsReporter) runOnce() {
u.logger.Trace("Starting usage run")
state := u.stateProvider.State()
_, nodes, err := state.NodeCount()
if err != nil {
Expand All @@ -164,6 +169,13 @@ func (u *UsageMetricsReporter) runOnce() {

servers, clients := u.memberUsage()
u.emitMemberUsage(servers, clients)

_, kvUsage, err := state.KVUsage()
if err != nil {
u.logger.Warn("failed to retrieve kv entry usage from state store", "error", err)
}

u.emitKVUsage(kvUsage)
}

func (u *UsageMetricsReporter) memberUsage() (int, map[string]int) {
Expand Down
8 changes: 8 additions & 0 deletions agent/consul/usagemetrics/usagemetrics_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ func (u *UsageMetricsReporter) emitServiceUsage(serviceUsage state.ServiceUsage)
u.metricLabels,
)
}

func (u *UsageMetricsReporter) emitKVUsage(kvUsage state.KVUsage) {
metrics.SetGaugeWithLabels(
[]string{"consul", "state", "kv_entries"},
float32(kvUsage.KVCount),
u.metricLabels,
)
}
168 changes: 168 additions & 0 deletions agent/consul/usagemetrics/usagemetrics_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
{Name: "datacenter", Value: "dc1"},
},
},
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
Name: "consul.usage.test.consul.state.kv_entries",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
getMembersFunc: func() []serf.Member { return []serf.Member{} },
},
Expand Down Expand Up @@ -151,6 +156,11 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
{Name: "datacenter", Value: "dc1"},
},
},
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
Name: "consul.usage.test.consul.state.kv_entries",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
},
}
Expand Down Expand Up @@ -189,3 +199,161 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) {
})
}
}

func TestUsageReporter_emitKVUsage_OSS(t *testing.T) {
type testCase struct {
modfiyStateStore func(t *testing.T, s *state.Store)
getMembersFunc getMembersFunc
expectedGauges map[string]metrics.GaugeValue
}
cases := map[string]testCase{
"empty-state": {
expectedGauges: map[string]metrics.GaugeValue{
// --- node ---
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- member ---
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- service ---
"consul.usage.test.consul.state.services;datacenter=dc1": {
Name: "consul.usage.test.consul.state.services",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
Name: "consul.usage.test.consul.state.service_instances",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
Name: "consul.usage.test.consul.state.kv_entries",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
getMembersFunc: func() []serf.Member { return []serf.Member{} },
},
"nodes": {
modfiyStateStore: func(t *testing.T, s *state.Store) {
require.NoError(t, s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
require.NoError(t, s.EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
require.NoError(t, s.EnsureNode(3, &structs.Node{Node: "baz", Address: "127.0.0.2"}))

require.NoError(t, s.KVSSet(4, &structs.DirEntry{Key: "a", Value: []byte{1}}))
require.NoError(t, s.KVSSet(5, &structs.DirEntry{Key: "b", Value: []byte{1}}))
require.NoError(t, s.KVSSet(6, &structs.DirEntry{Key: "c", Value: []byte{1}}))
require.NoError(t, s.KVSSet(7, &structs.DirEntry{Key: "d", Value: []byte{1}}))
require.NoError(t, s.KVSDelete(8, "d", &structs.EnterpriseMeta{}))
require.NoError(t, s.KVSDelete(9, "c", &structs.EnterpriseMeta{}))
require.NoError(t, s.KVSSet(10, &structs.DirEntry{Key: "e", Value: []byte{1}}))
require.NoError(t, s.KVSSet(11, &structs.DirEntry{Key: "f", Value: []byte{1}}))
},
getMembersFunc: func() []serf.Member {
return []serf.Member{
{
Name: "foo",
Tags: map[string]string{"role": "consul"},
Status: serf.StatusAlive,
},
{
Name: "bar",
Tags: map[string]string{"role": "consul"},
Status: serf.StatusAlive,
},
{
Name: "baz",
Tags: map[string]string{"role": "node"},
Status: serf.StatusAlive,
},
}
},
expectedGauges: map[string]metrics.GaugeValue{
// --- node ---
"consul.usage.test.consul.state.nodes;datacenter=dc1": {
Name: "consul.usage.test.consul.state.nodes",
Value: 3,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
// --- member ---
"consul.usage.test.consul.members.servers;datacenter=dc1": {
Name: "consul.usage.test.consul.members.servers",
Value: 2,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.clients;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 1,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.members.clients;segment=;datacenter=dc1": {
Name: "consul.usage.test.consul.members.clients",
Value: 1,
Labels: []metrics.Label{{Name: "segment", Value: ""}, {Name: "datacenter", Value: "dc1"}},
},
// --- service ---
"consul.usage.test.consul.state.services;datacenter=dc1": {
Name: "consul.usage.test.consul.state.services",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.state.service_instances;datacenter=dc1": {
Name: "consul.usage.test.consul.state.service_instances",
Value: 0,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
"consul.usage.test.consul.state.kv_entries;datacenter=dc1": {
Name: "consul.usage.test.consul.state.kv_entries",
Value: 4,
Labels: []metrics.Label{{Name: "datacenter", Value: "dc1"}},
},
},
},
}

for name, tcase := range cases {
t.Run(name, func(t *testing.T) {
// Only have a single interval for the test
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.usage.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, sink)

mockStateProvider := &mockStateProvider{}
s, err := newStateStore()
require.NoError(t, err)
if tcase.modfiyStateStore != nil {
tcase.modfiyStateStore(t, s)
}
mockStateProvider.On("State").Return(s)

reporter, err := NewUsageMetricsReporter(
new(Config).
WithStateProvider(mockStateProvider).
WithLogger(testutil.Logger(t)).
WithDatacenter("dc1").
WithGetMembersFunc(tcase.getMembersFunc),
)
require.NoError(t, err)

reporter.runOnce()

intervals := sink.Data()
require.Len(t, intervals, 1)
intv := intervals[0]

assertEqualGaugeMaps(t, tcase.expectedGauges, intv.Gauges)
})
}
}
19 changes: 19 additions & 0 deletions agent/consul/usagemetrics/usagemetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand All @@ -23,6 +24,24 @@ func (m *mockStateProvider) State() *state.Store {
return retValues.Get(0).(*state.Store)
}

func assertEqualGaugeMaps(t *testing.T, expectedMap, foundMap map[string]metrics.GaugeValue) {
t.Helper()

for key := range foundMap {
if _, ok := expectedMap[key]; !ok {
t.Errorf("found unexpected gauge key: %s", key)
}
}

for key, expected := range expectedMap {
if _, ok := foundMap[key]; !ok {
t.Errorf("did not find expected gauge key: %s", key)
continue
}
assert.Equal(t, expected, foundMap[key], "gauge key mismatch on %q", key)
}
}

func TestUsageReporter_Run_Nodes(t *testing.T) {
type testCase struct {
modfiyStateStore func(t *testing.T, s *state.Store)
Expand Down
1 change: 1 addition & 0 deletions website/content/docs/agent/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ This is a full list of metrics emitted by Consul.
| `consul.state.nodes` | Measures the current number of nodes registered with Consul. It is only emitted by Consul servers. Added in v1.9.0. | number of objects | gauge |
| `consul.state.services` | Measures the current number of unique services registered with Consul, based on service name. It is only emitted by Consul servers. Added in v1.9.0. | number of objects | gauge |
| `consul.state.service_instances` | Measures the current number of unique service instances registered with Consul. It is only emitted by Consul servers. Added in v1.9.0. | number of objects | gauge |
| `consul.state.kv_entries` | Measures the current number of unique KV entries written in Consul. It is only emitted by Consul servers. Added in v1.10.3. | number of objects | gauge |
| `consul.members.clients` | Measures the current number of client agents registered with Consul. It is only emitted by Consul servers. Added in v1.9.6. | number of clients | gauge |
| `consul.members.servers` | Measures the current number of server agents registered with Consul. It is only emitted by Consul servers. Added in v1.9.6. | number of servers | gauge |
| `consul.dns.stale_queries` | Increments when an agent serves a query within the allowed stale threshold. | queries | counter |
Expand Down

0 comments on commit 6dcf27a

Please sign in to comment.