Skip to content

Commit

Permalink
metrics-generator: support per-tenant processor configuration (#1434)
Browse files Browse the repository at this point in the history
* metrics-generator: support per-tenant processor configuration

* avoid duplicate metrics in the registry

* fix tests, linting

* Use type switch

* fix tests as well

* Add docs, changelog

* code review suggestions

* Update docs/tempo/website/configuration/_index.md

Co-authored-by: Kim Nylander <[email protected]>

Co-authored-by: Kim Nylander <[email protected]>
  • Loading branch information
Koenraad Verheyden and knylander-grafana authored May 18, 2022
1 parent 4e0f12d commit 75249be
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [FEATURE] metrics-generator: support per-tenant processor configuration [#1434](https://github.com/grafana/tempo/pull/1434) (@kvrhdn)
* [ENHANCEMENT] Added the ability to have a per tenant max search duration. [#1421](https://github.com/grafana/tempo/pull/1421) (@joe-elliott)

## v1.4.1 / 2022-05-05
Expand Down
7 changes: 7 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,13 @@ overrides:
# - span-metrics
[metrics_generator_processors: <list of strings>]

# Per-user configuration of the metrics-generator processors. The following configuration
# overrides settings in the global configuration.
[metrics_generator_processor_service_graphs_histogram_buckets: <list of float>]
[metrics_generator_processor_service_graphs_dimensions: <list of string>]
[metrics_generator_processor_span_metrics_histogram_buckets: <<list of float>]
[metrics_generator_processor_span_metrics_dimensions: <list of string>]

# Maximum number of active series in the registry, per instance of the metrics-generator. A
# value of 0 disables this check.
# If the limit is reached, no new series will be added but existing series will still be
Expand Down
20 changes: 20 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,23 @@ func (cfg *ProcessorConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag
cfg.ServiceGraphs.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.SpanMetrics.RegisterFlagsAndApplyDefaults(prefix, f)
}

// copyWithOverrides creates a copy of the config using values set in the overrides.
func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userID string) ProcessorConfig {
copyCfg := *cfg

if buckets := o.MetricsGeneratorProcessorServiceGraphsHistogramBuckets(userID); buckets != nil {
copyCfg.ServiceGraphs.HistogramBuckets = buckets
}
if dimensions := o.MetricsGeneratorProcessorServiceGraphsDimensions(userID); dimensions != nil {
copyCfg.ServiceGraphs.Dimensions = dimensions
}
if buckets := o.MetricsGeneratorProcessorSpanMetricsHistogramBuckets(userID); buckets != nil {
copyCfg.SpanMetrics.HistogramBuckets = buckets
}
if dimensions := o.MetricsGeneratorProcessorSpanMetricsDimensions(userID); dimensions != nil {
copyCfg.SpanMetrics.Dimensions = dimensions
}

return copyCfg
}
56 changes: 56 additions & 0 deletions modules/generator/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package generator

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/modules/generator/processor/servicegraphs"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
)

func TestProcessorConfig_copyWithOverrides(t *testing.T) {
original := &ProcessorConfig{
ServiceGraphs: servicegraphs.Config{
HistogramBuckets: []float64{1},
Dimensions: []string{},
},
SpanMetrics: spanmetrics.Config{
HistogramBuckets: []float64{1, 2},
Dimensions: []string{"namespace"},
},
}

t.Run("overrides buckets and dimension", func(t *testing.T) {
o := &mockOverrides{
serviceGraphsHistogramBuckets: []float64{1, 2},
serviceGraphsDimensions: []string{"namespace"},
spanMetricsHistogramBuckets: []float64{1, 2, 3},
spanMetricsDimensions: []string{"cluster", "namespace"},
}

copied := original.copyWithOverrides(o, "tenant")

assert.NotEqual(t, *original, copied)

// assert nothing changed
assert.Equal(t, []float64{1}, original.ServiceGraphs.HistogramBuckets)
assert.Equal(t, []string{}, original.ServiceGraphs.Dimensions)
assert.Equal(t, []float64{1, 2}, original.SpanMetrics.HistogramBuckets)
assert.Equal(t, []string{"namespace"}, original.SpanMetrics.Dimensions)

// assert overrides were applied
assert.Equal(t, []float64{1, 2}, copied.ServiceGraphs.HistogramBuckets)
assert.Equal(t, []string{"namespace"}, copied.ServiceGraphs.Dimensions)
assert.Equal(t, []float64{1, 2, 3}, copied.SpanMetrics.HistogramBuckets)
assert.Equal(t, []string{"cluster", "namespace"}, copied.SpanMetrics.Dimensions)
})

t.Run("empty overrides", func(t *testing.T) {
o := &mockOverrides{}

copied := original.copyWithOverrides(o, "tenant")

assert.Equal(t, *original, copied)
})
}
66 changes: 51 additions & 15 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package generator
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -56,7 +57,9 @@ type instance struct {

// processorsMtx protects the processors map, not the processors itself
processorsMtx sync.RWMutex
processors map[string]processor.Processor
// processors is a map of processor name -> processor, only one instance of a processor can be
// active at any time
processors map[string]processor.Processor

shutdownCh chan struct{}

Expand All @@ -83,7 +86,7 @@ func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverr
logger: logger,
}

err := i.updateProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID))
err := i.updateProcessors()
if err != nil {
return nil, fmt.Errorf("could not initialize processors: %w", err)
}
Expand All @@ -101,7 +104,7 @@ func (i *instance) watchOverrides() {
for {
select {
case <-ticker.C:
err := i.updateProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID))
err := i.updateProcessors()
if err != nil {
metricActiveProcessorsUpdateFailed.WithLabelValues(i.instanceID).Inc()
level.Error(i.logger).Log("msg", "updating the processors failed", "err", err)
Expand All @@ -113,60 +116,93 @@ func (i *instance) watchOverrides() {
}
}

func (i *instance) updateProcessors(desiredProcessors map[string]struct{}) error {
func (i *instance) updateProcessors() error {
desiredProcessors := i.overrides.MetricsGeneratorProcessors(i.instanceID)
desiredCfg := i.cfg.Processor.copyWithOverrides(i.overrides, i.instanceID)

i.processorsMtx.RLock()
toAdd, toRemove := i.diffProcessors(desiredProcessors)
toAdd, toRemove, toReplace, err := i.diffProcessors(desiredProcessors, desiredCfg)
i.processorsMtx.RUnlock()

if len(toAdd) == 0 && len(toRemove) == 0 {
if err != nil {
return err
}
if len(toAdd) == 0 && len(toRemove) == 0 && len(toReplace) == 0 {
return nil
}

i.processorsMtx.Lock()
defer i.processorsMtx.Unlock()

for _, processorName := range toAdd {
err := i.addProcessor(processorName)
err := i.addProcessor(processorName, desiredCfg)
if err != nil {
return err
}
}
for _, processorName := range toRemove {
i.removeProcessor(processorName)
}
for _, processorName := range toReplace {
i.removeProcessor(processorName)

err := i.addProcessor(processorName, desiredCfg)
if err != nil {
return err
}
}

i.updateProcessorMetrics()

return nil
}

// diffProcessors compares the existings processors with desiredProcessors. Must be called under a
// read lock.
func (i *instance) diffProcessors(desiredProcessors map[string]struct{}) (toAdd []string, toRemove []string) {
// diffProcessors compares the existing processors with the desired processors and config.
// Must be called under a read lock.
func (i *instance) diffProcessors(desiredProcessors map[string]struct{}, desiredCfg ProcessorConfig) (toAdd, toRemove, toReplace []string, err error) {
for processorName := range desiredProcessors {
if _, ok := i.processors[processorName]; !ok {
toAdd = append(toAdd, processorName)
}
}
for processorName := range i.processors {
for processorName, processor := range i.processors {
if _, ok := desiredProcessors[processorName]; !ok {
toRemove = append(toRemove, processorName)
continue
}

switch p := processor.(type) {
case *spanmetrics.Processor:
if !reflect.DeepEqual(p.Cfg, desiredCfg.SpanMetrics) {
toReplace = append(toReplace, processorName)
}
case *servicegraphs.Processor:
if !reflect.DeepEqual(p.Cfg, desiredCfg.ServiceGraphs) {
toReplace = append(toReplace, processorName)
}
default:
level.Error(i.logger).Log(
"msg", fmt.Sprintf("processor does not exist, supported processors: [%s]", strings.Join(allSupportedProcessors, ", ")),
"processorName", processorName,
)
err = fmt.Errorf("unknown processor %s", processorName)
return
}
}
return toAdd, toRemove
return
}

// addProcessor registers the processor and adds it to the processors map. Must be called under a
// write lock.
func (i *instance) addProcessor(processorName string) error {
func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error {
level.Debug(i.logger).Log("msg", "adding processor", "processorName", processorName)

var newProcessor processor.Processor
switch processorName {
case spanmetrics.Name:
newProcessor = spanmetrics.New(i.cfg.Processor.SpanMetrics, i.registry)
newProcessor = spanmetrics.New(cfg.SpanMetrics, i.registry)
case servicegraphs.Name:
newProcessor = servicegraphs.New(i.cfg.Processor.ServiceGraphs, i.instanceID, i.registry, i.logger)
newProcessor = servicegraphs.New(cfg.ServiceGraphs, i.instanceID, i.registry, i.logger)
default:
level.Error(i.logger).Log(
"msg", fmt.Sprintf("processor does not exist, supported processors: [%s]", strings.Join(allSupportedProcessors, ", ")),
Expand Down
71 changes: 36 additions & 35 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package generator

import (
"context"
"flag"
"os"
"testing"
"time"

Expand All @@ -20,7 +22,8 @@ import (
)

func Test_instance_concurrency(t *testing.T) {
instance, err := newInstance(&Config{}, "test", &mockOverrides{}, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger())
overrides := &mockOverrides{}
instance, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger())
assert.NoError(t, err)

end := make(chan struct{})
Expand All @@ -42,18 +45,16 @@ func Test_instance_concurrency(t *testing.T) {
})

go accessor(func() {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
"span-metrics": {},
}
err := instance.updateProcessors(processors)
err := instance.updateProcessors()
assert.NoError(t, err)
})

go accessor(func() {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
"service-graphs": {},
}
err := instance.updateProcessors(processors)
err = instance.updateProcessors()
assert.NoError(t, err)
})

Expand All @@ -67,7 +68,12 @@ func Test_instance_concurrency(t *testing.T) {
}

func Test_instance_updateProcessors(t *testing.T) {
instance, err := newInstance(&Config{}, "test", &mockOverrides{}, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger())
cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
overrides := mockOverrides{}

instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger)
assert.NoError(t, err)

// stop the update goroutine
Expand All @@ -77,56 +83,51 @@ func Test_instance_updateProcessors(t *testing.T) {
assert.Len(t, instance.processors, 0)

t.Run("add new processor", func(t *testing.T) {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
}
err := instance.updateProcessors(processors)
err := instance.updateProcessors()
assert.NoError(t, err)

assert.Len(t, instance.processors, 1)
assert.Equal(t, instance.processors[servicegraphs.Name].Name(), servicegraphs.Name)
})

t.Run("add unknown processor", func(t *testing.T) {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
"span-metricsss": {}, // typo in the overrides
}
err := instance.updateProcessors(processors)
err := instance.updateProcessors()
assert.Error(t, err)

// existing processors should not be removed when adding a new processor fails
assert.Len(t, instance.processors, 1)
assert.Equal(t, instance.processors[servicegraphs.Name].Name(), servicegraphs.Name)
})

t.Run("remove processor", func(t *testing.T) {
err := instance.updateProcessors(nil)
assert.NoError(t, err)

assert.Len(t, instance.processors, 0)
})
}

type mockOverrides struct {
processors map[string]struct{}
}
t.Run("replace processor", func(t *testing.T) {
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
}
overrides.serviceGraphsDimensions = []string{"namespace"}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
err := instance.updateProcessors()
assert.NoError(t, err)

func (m *mockOverrides) MetricsGeneratorMaxActiveSeries(userID string) uint32 {
return 0
}
var expectedConfig servicegraphs.Config
expectedConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
expectedConfig.Dimensions = []string{"namespace"}

func (m *mockOverrides) MetricsGeneratorCollectionInterval(userID string) time.Duration {
return 15 * time.Second
}
assert.Equal(t, expectedConfig, instance.processors[servicegraphs.Name].(*servicegraphs.Processor).Cfg)
})

func (m *mockOverrides) MetricsGeneratorProcessors(userID string) map[string]struct{} {
return m.processors
}
t.Run("remove processor", func(t *testing.T) {
overrides.processors = nil
err := instance.updateProcessors()
assert.NoError(t, err)

func (m *mockOverrides) MetricsGeneratorDisableCollection(userID string) bool {
return false
assert.Len(t, instance.processors, 0)
})
}

type noopStorage struct{}
Expand Down
4 changes: 4 additions & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ type metricsGeneratorOverrides interface {
registry.Overrides

MetricsGeneratorProcessors(userID string) map[string]struct{}
MetricsGeneratorProcessorServiceGraphsHistogramBuckets(userID string) []float64
MetricsGeneratorProcessorServiceGraphsDimensions(userID string) []string
MetricsGeneratorProcessorSpanMetricsHistogramBuckets(userID string) []float64
MetricsGeneratorProcessorSpanMetricsDimensions(userID string) []string
}

var _ metricsGeneratorOverrides = (*overrides.Overrides)(nil)
Loading

0 comments on commit 75249be

Please sign in to comment.