Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics-generator: support per-tenant processor configuration #1434

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,24 @@ func (cfg *ProcessorConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag
cfg.ServiceGraphs.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.SpanMetrics.RegisterFlagsAndApplyDefaults(prefix, f)
}

// copyWithOverrides creates a copy of the config using values set in the overrides.
func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userID string) ProcessorConfig {
var copyCfg ProcessorConfig
copyCfg = *cfg

if buckets := o.MetricsGeneratorProcessorServiceGraphsHistogramBuckets(userID); buckets != nil {
copyCfg.ServiceGraphs.HistogramBuckets = buckets
}
if dimensions := o.MetricsGeneratorProcessorServiceGraphsDimensions(userID); dimensions != nil {
copyCfg.ServiceGraphs.Dimensions = dimensions
}
if buckets := o.MetricsGeneratorProcessorSpanMetricsHistogramBuckets(userID); buckets != nil {
copyCfg.SpanMetrics.HistogramBuckets = buckets
}
if dimensions := o.MetricsGeneratorProcessorSpanMetricsDimensions(userID); dimensions != nil {
copyCfg.SpanMetrics.Dimensions = dimensions
}

return copyCfg
}
56 changes: 56 additions & 0 deletions modules/generator/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package generator

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/modules/generator/processor/servicegraphs"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
)

func TestProcessorConfig_copyWithOverrides(t *testing.T) {
original := &ProcessorConfig{
ServiceGraphs: servicegraphs.Config{
HistogramBuckets: []float64{1},
Dimensions: []string{},
},
SpanMetrics: spanmetrics.Config{
HistogramBuckets: []float64{1, 2},
Dimensions: []string{"namespace"},
},
}

t.Run("overrides buckets and dimension", func(t *testing.T) {
o := &mockOverrides{
serviceGraphsHistogramBuckets: []float64{1, 2},
serviceGraphsDimensions: []string{"namespace"},
spanMetricsHistogramBuckets: []float64{1, 2, 3},
spanMetricsDimensions: []string{"cluster", "namespace"},
}

copied := original.copyWithOverrides(o, "tenant")

assert.NotEqual(t, *original, copied)

// assert nothing changed
assert.Equal(t, []float64{1}, original.ServiceGraphs.HistogramBuckets)
assert.Equal(t, []string{}, original.ServiceGraphs.Dimensions)
assert.Equal(t, []float64{1, 2}, original.SpanMetrics.HistogramBuckets)
assert.Equal(t, []string{"namespace"}, original.SpanMetrics.Dimensions)

// assert overrides were applied
assert.Equal(t, []float64{1, 2}, copied.ServiceGraphs.HistogramBuckets)
assert.Equal(t, []string{"namespace"}, copied.ServiceGraphs.Dimensions)
assert.Equal(t, []float64{1, 2, 3}, copied.SpanMetrics.HistogramBuckets)
assert.Equal(t, []string{"cluster", "namespace"}, copied.SpanMetrics.Dimensions)
})

t.Run("empty overrides", func(t *testing.T) {
o := &mockOverrides{}

copied := original.copyWithOverrides(o, "tenant")

assert.Equal(t, *original, copied)
})
}
65 changes: 50 additions & 15 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package generator
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -56,7 +57,9 @@ type instance struct {

// processorsMtx protects the processors map, not the processors itself
processorsMtx sync.RWMutex
processors map[string]processor.Processor
// processors is a map of processor name -> processor, only one instance of a processor can be
// active at any time
processors map[string]processor.Processor

shutdownCh chan struct{}

Expand All @@ -83,7 +86,7 @@ func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverr
logger: logger,
}

err := i.updateProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID))
err := i.updateProcessors()
if err != nil {
return nil, fmt.Errorf("could not initialize processors: %w", err)
}
Expand All @@ -101,7 +104,7 @@ func (i *instance) watchOverrides() {
for {
select {
case <-ticker.C:
err := i.updateProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID))
err := i.updateProcessors()
if err != nil {
metricActiveProcessorsUpdateFailed.WithLabelValues(i.instanceID).Inc()
level.Error(i.logger).Log("msg", "updating the processors failed", "err", err)
Expand All @@ -113,60 +116,92 @@ func (i *instance) watchOverrides() {
}
}

func (i *instance) updateProcessors(desiredProcessors map[string]struct{}) error {
func (i *instance) updateProcessors() error {
desiredProcessors := i.overrides.MetricsGeneratorProcessors(i.instanceID)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
desiredCfg := i.cfg.Processor.copyWithOverrides(i.overrides, i.instanceID)

i.processorsMtx.RLock()
toAdd, toRemove := i.diffProcessors(desiredProcessors)
toAdd, toRemove, toReplace, err := i.diffProcessors(desiredProcessors, desiredCfg)
i.processorsMtx.RUnlock()

if len(toAdd) == 0 && len(toRemove) == 0 {
if err != nil {
return err
}
if len(toAdd) == 0 && len(toRemove) == 0 && len(toReplace) == 0 {
return nil
}

i.processorsMtx.Lock()
defer i.processorsMtx.Unlock()

for _, processorName := range toAdd {
err := i.addProcessor(processorName)
err := i.addProcessor(processorName, desiredCfg)
if err != nil {
return err
}
}
for _, processorName := range toRemove {
i.removeProcessor(processorName)
}
for _, processorName := range toReplace {
i.removeProcessor(processorName)

err := i.addProcessor(processorName, desiredCfg)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
}

i.updateProcessorMetrics()

return nil
}

// diffProcessors compares the existings processors with desiredProcessors. Must be called under a
// read lock.
func (i *instance) diffProcessors(desiredProcessors map[string]struct{}) (toAdd []string, toRemove []string) {
// diffProcessors compares the existing processors with the desired processors and config.
// Must be called under a read lock.
func (i *instance) diffProcessors(desiredProcessors map[string]struct{}, desiredCfg ProcessorConfig) (toAdd, toRemove, toReplace []string, err error) {
for processorName := range desiredProcessors {
if _, ok := i.processors[processorName]; !ok {
toAdd = append(toAdd, processorName)
}
}
for processorName := range i.processors {
for processorName, processor := range i.processors {
if _, ok := desiredProcessors[processorName]; !ok {
toRemove = append(toRemove, processorName)
continue
}
switch processorName {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
case spanmetrics.Name:
if !reflect.DeepEqual(processor.Config(), desiredCfg.SpanMetrics) {
toReplace = append(toReplace, processorName)
}
case servicegraphs.Name:
if !reflect.DeepEqual(processor.Config(), desiredCfg.ServiceGraphs) {
toReplace = append(toReplace, processorName)
}
default:
level.Error(i.logger).Log(
"msg", fmt.Sprintf("processor does not exist, supported processors: [%s]", strings.Join(allSupportedProcessors, ", ")),
"processorName", processorName,
)
err = fmt.Errorf("unknown processor %s", processorName)
return
}
}
return toAdd, toRemove
return
}

// addProcessor registers the processor and adds it to the processors map. Must be called under a
// write lock.
func (i *instance) addProcessor(processorName string) error {
func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error {
level.Debug(i.logger).Log("msg", "adding processor", "processorName", processorName)

var newProcessor processor.Processor
switch processorName {
case spanmetrics.Name:
newProcessor = spanmetrics.New(i.cfg.Processor.SpanMetrics, i.registry)
newProcessor = spanmetrics.New(cfg.SpanMetrics, i.registry)
case servicegraphs.Name:
newProcessor = servicegraphs.New(i.cfg.Processor.ServiceGraphs, i.instanceID, i.registry, i.logger)
newProcessor = servicegraphs.New(cfg.ServiceGraphs, i.instanceID, i.registry, i.logger)
default:
level.Error(i.logger).Log(
"msg", fmt.Sprintf("processor does not exist, supported processors: [%s]", strings.Join(allSupportedProcessors, ", ")),
Expand Down
71 changes: 36 additions & 35 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package generator

import (
"context"
"flag"
"os"
"testing"
"time"

Expand All @@ -20,7 +22,8 @@ import (
)

func Test_instance_concurrency(t *testing.T) {
instance, err := newInstance(&Config{}, "test", &mockOverrides{}, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger())
overrides := &mockOverrides{}
instance, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger())
assert.NoError(t, err)

end := make(chan struct{})
Expand All @@ -42,18 +45,16 @@ func Test_instance_concurrency(t *testing.T) {
})

go accessor(func() {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
"span-metrics": {},
}
err := instance.updateProcessors(processors)
err := instance.updateProcessors()
assert.NoError(t, err)
})

go accessor(func() {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
"service-graphs": {},
}
err := instance.updateProcessors(processors)
err = instance.updateProcessors()
assert.NoError(t, err)
})

Expand All @@ -67,7 +68,12 @@ func Test_instance_concurrency(t *testing.T) {
}

func Test_instance_updateProcessors(t *testing.T) {
instance, err := newInstance(&Config{}, "test", &mockOverrides{}, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger())
cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
overrides := mockOverrides{}

instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger)
assert.NoError(t, err)

// stop the update goroutine
Expand All @@ -77,56 +83,51 @@ func Test_instance_updateProcessors(t *testing.T) {
assert.Len(t, instance.processors, 0)

t.Run("add new processor", func(t *testing.T) {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
}
err := instance.updateProcessors(processors)
err := instance.updateProcessors()
assert.NoError(t, err)

assert.Len(t, instance.processors, 1)
assert.Equal(t, instance.processors[servicegraphs.Name].Name(), servicegraphs.Name)
})

t.Run("add unknown processor", func(t *testing.T) {
processors := map[string]struct{}{
overrides.processors = map[string]struct{}{
"span-metricsss": {}, // typo in the overrides
}
err := instance.updateProcessors(processors)
err := instance.updateProcessors()
assert.Error(t, err)

// existing processors should not be removed when adding a new processor fails
assert.Len(t, instance.processors, 1)
assert.Equal(t, instance.processors[servicegraphs.Name].Name(), servicegraphs.Name)
})

t.Run("remove processor", func(t *testing.T) {
err := instance.updateProcessors(nil)
assert.NoError(t, err)

assert.Len(t, instance.processors, 0)
})
}

type mockOverrides struct {
processors map[string]struct{}
}
t.Run("replace processor", func(t *testing.T) {
overrides.processors = map[string]struct{}{
servicegraphs.Name: {},
}
overrides.serviceGraphsDimensions = []string{"namespace"}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
err := instance.updateProcessors()
assert.NoError(t, err)

func (m *mockOverrides) MetricsGeneratorMaxActiveSeries(userID string) uint32 {
return 0
}
var expectedConfig servicegraphs.Config
expectedConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
expectedConfig.Dimensions = []string{"namespace"}

func (m *mockOverrides) MetricsGeneratorCollectionInterval(userID string) time.Duration {
return 15 * time.Second
}
assert.Equal(t, expectedConfig, instance.processors[servicegraphs.Name].Config())
})

func (m *mockOverrides) MetricsGeneratorProcessors(userID string) map[string]struct{} {
return m.processors
}
t.Run("remove processor", func(t *testing.T) {
overrides.processors = nil
err := instance.updateProcessors()
assert.NoError(t, err)

func (m *mockOverrides) MetricsGeneratorDisableCollection(userID string) bool {
return false
assert.Len(t, instance.processors, 0)
})
}

type noopStorage struct{}
Expand Down
4 changes: 4 additions & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ type metricsGeneratorOverrides interface {
registry.Overrides

MetricsGeneratorProcessors(userID string) map[string]struct{}
MetricsGeneratorProcessorServiceGraphsHistogramBuckets(userID string) []float64
MetricsGeneratorProcessorServiceGraphsDimensions(userID string) []string
MetricsGeneratorProcessorSpanMetricsHistogramBuckets(userID string) []float64
MetricsGeneratorProcessorSpanMetricsDimensions(userID string) []string
}

var _ metricsGeneratorOverrides = (*overrides.Overrides)(nil)
Loading