Skip to content

Commit

Permalink
Merge pull request #48 from SumoLogic/span-filtering
Browse files Browse the repository at this point in the history
Add span filtering capability
  • Loading branch information
pmm-sumo authored Jan 11, 2021
2 parents ec66429 + 77a85af commit 6795013
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 40 deletions.
30 changes: 24 additions & 6 deletions processor/filterprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
# Filter Processor

Supported pipeline types: metrics
Supported pipeline types: traces, metrics

The filter processor can be configured to include or exclude metrics based on
metric name in the case of the 'strict' or 'regexp' match types, or based on other
metric attributes in the case of the 'expr' match type. Please refer to
[config.go](./config.go) for the config spec.

It takes a pipeline type, of which only `metrics` is supported, followed by an
action:
It takes a pipeline type, of which only `metrics` and `spans` is supported, where following actions are available:
- `include`: Any names NOT matching filters are excluded from remainder of pipeline
- `exclude`: Any names matching filters are excluded from remainder of pipeline

For the actions the following parameters are required:
For the `metrics` actions the following parameters are required:
- `match_type`: strict|regexp|expr
- `metric_names`: (only for a `match_type` of 'strict' or 'regexp') list of strings or re2 regex patterns
- `expressions`: (only for a `match_type` of 'expr') list of expr expressions (see "Using an 'expr' match_type" below)

More details can found at [include/exclude metrics](../README.md#includeexclude-metrics).
For `spans`, following parameters are available:
- `match_type`: strict|regexp|expr
- `services`: list of strings for matching service names
- `span_names`: list of strings for matching span names
- `attributes`: list of attributes to match against

More details can found at [include/exclude](../README.md#includeexclude-metrics).

Examples:

```yaml
processors:
processors:
filter/1:
metrics:
include:
Expand All @@ -35,6 +40,19 @@ processors:
metric_names:
- hello_world
- hello/world
filter/2:
spans:
include:
match_type: regexp
span_names:
# re2 regexp patterns
- ^prefix/.*
- .*/suffix
exclude:
match_type: regexp
span_names:
- ^other_prefix/.*
- .*/other_suffix
```
Refer to the config files in [testdata](./testdata) for detailed
Expand Down
15 changes: 15 additions & 0 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,28 @@ package filterprocessor

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/internal/processor/filterconfig"
"go.opentelemetry.io/collector/internal/processor/filtermetric"
)

// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Metrics MetricFilters `mapstructure:"metrics"`
Spans SpanFilters `mapstructure:"spans"`
}

// SpanFilters filters by Span properties
type SpanFilters struct {
// Include match properties describe spans that should be included in the Collector Service pipeline,
// all other spans should be dropped from further processing.
// If both Include and Exclude are specified, Include filtering occurs first.
Include *filterconfig.MatchProperties `mapstructure:"include"`

// Exclude match properties describe spans that should be excluded from the Collector Service pipeline,
// all other spans should be included.
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`
}

// MetricFilter filters by Metric properties.
Expand Down
44 changes: 44 additions & 0 deletions processor/filterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"path"
"testing"

"go.opentelemetry.io/collector/internal/processor/filterconfig"
"go.opentelemetry.io/collector/internal/processor/filterset"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -311,3 +314,44 @@ func TestLoadingConfigExpr(t *testing.T) {
})
}
}

func TestLoadingConfigSpans(t *testing.T) {
factories, err := componenttest.ExampleComponents()
require.NoError(t, err)
factory := NewFactory()
factories.Processors[configmodels.Type(typeStr)] = factory
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_spans.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, config)

tests := []struct {
filterName string
expCfg configmodels.Processor
}{
{
filterName: "filter/spans",
expCfg: &Config{
ProcessorSettings: configmodels.ProcessorSettings{
NameVal: "filter/spans",
TypeVal: typeStr,
},
Spans: SpanFilters{
Include: &filterconfig.MatchProperties{
Config: filterset.Config{MatchType: "regexp"},
SpanNames: []string{"prefix/.*", ".*/suffix"},
},
Exclude: &filterconfig.MatchProperties{
Config: filterset.Config{MatchType: "regexp"},
SpanNames: []string{"other_prefix/.*", ".*/other_suffix"},
},
},
},
},
}
for _, test := range tests {
t.Run(test.filterName, func(t *testing.T) {
cfg := config.Processors[test.filterName]
assert.Equal(t, test.expCfg, cfg)
})
}
}
18 changes: 18 additions & 0 deletions processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTracesProcessor),
processorhelper.WithMetrics(createMetricsProcessor))
}

Expand Down Expand Up @@ -63,3 +64,20 @@ func createMetricsProcessor(
fp,
processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.TracesConsumer,
) (component.TracesProcessor, error) {
fp, err := newFilterSpanProcessor(params.Logger, cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewTraceProcessor(
cfg,
nextConsumer,
fp,
processorhelper.WithCapabilities(processorCapabilities))
}
5 changes: 2 additions & 3 deletions processor/filterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ func TestCreateProcessors(t *testing.T) {
factory := NewFactory()

tp, tErr := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
// Not implemented error
assert.NotNil(t, tErr)
assert.Nil(t, tp)
assert.Equal(t, test.succeed, tp != nil)
assert.Equal(t, test.succeed, tErr == nil)

mp, mErr := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewMetricsNop())
assert.Equal(t, test.succeed, mp != nil)
Expand Down
156 changes: 129 additions & 27 deletions processor/filterprocessor/filter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,32 @@ package filterprocessor
import (
"context"

"go.opentelemetry.io/collector/internal/processor/filterconfig"
"go.opentelemetry.io/collector/internal/processor/filterspan"

"go.uber.org/zap"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filtermetric"
"go.opentelemetry.io/collector/processor/processorhelper"
)

type filterMetricProcessor struct {
cfg *Config
include filtermetric.Matcher
exclude filtermetric.Matcher
logger *zap.Logger
type filterProcessor struct {
cfg *Config
includeMetrics filtermetric.Matcher
excludeMetrics filtermetric.Matcher
includeSpans filterspan.Matcher
excludeSpans filterspan.Matcher
logger *zap.Logger
}

func newFilterMetricProcessor(logger *zap.Logger, cfg *Config) (*filterMetricProcessor, error) {
inc, err := createMatcher(cfg.Metrics.Include)
func newFilterMetricProcessor(logger *zap.Logger, cfg *Config) (*filterProcessor, error) {
inc, err := createMetricsMatcher(cfg.Metrics.Include)
if err != nil {
return nil, err
}

exc, err := createMatcher(cfg.Metrics.Exclude)
exc, err := createMetricsMatcher(cfg.Metrics.Exclude)
if err != nil {
return nil, err
}
Expand All @@ -62,32 +67,86 @@ func newFilterMetricProcessor(logger *zap.Logger, cfg *Config) (*filterMetricPro

logger.Info(
"Metric filter configured",
zap.String("include match_type", includeMatchType),
zap.Strings("include expressions", includeExpressions),
zap.Strings("include metric names", includeMetricNames),
zap.String("exclude match_type", excludeMatchType),
zap.Strings("exclude expressions", excludeExpressions),
zap.Strings("exclude metric names", excludeMetricNames),
zap.String("includeMetrics match_type", includeMatchType),
zap.Strings("includeMetrics expressions", includeExpressions),
zap.Strings("includeMetrics metric names", includeMetricNames),
zap.String("excludeMetrics match_type", excludeMatchType),
zap.Strings("excludeMetrics expressions", excludeExpressions),
zap.Strings("excludeMetrics metric names", excludeMetricNames),
)

return &filterProcessor{
cfg: cfg,
includeMetrics: inc,
excludeMetrics: exc,
logger: logger,
}, nil
}

func newFilterSpanProcessor(logger *zap.Logger, cfg *Config) (*filterProcessor, error) {
inc, err := createSpansMatcher(cfg.Spans.Include)
if err != nil {
return nil, err
}

exc, err := createSpansMatcher(cfg.Spans.Exclude)
if err != nil {
return nil, err
}

includeMatchType := ""
var includeServices []string
var includeSpanNames []string
if cfg.Spans.Include != nil {
includeMatchType = string(cfg.Spans.Include.MatchType)
includeServices = cfg.Spans.Include.Services
includeSpanNames = cfg.Spans.Include.SpanNames
}

excludeMatchType := ""
var excludeServices []string
var excludeSpanNames []string
if cfg.Spans.Exclude != nil {
excludeMatchType = string(cfg.Spans.Exclude.MatchType)
excludeServices = cfg.Spans.Exclude.Services
excludeSpanNames = cfg.Spans.Exclude.SpanNames
}

logger.Info(
"Span filter configured",
zap.String("includeSpans match_type", includeMatchType),
zap.Strings("includeSpans services", includeServices),
zap.Strings("includeSpans span names", includeSpanNames),
zap.String("excludeSpans match_type", excludeMatchType),
zap.Strings("excludeSpans services", excludeServices),
zap.Strings("excludeSpans span names", excludeSpanNames),
)

return &filterMetricProcessor{
cfg: cfg,
include: inc,
exclude: exc,
logger: logger,
return &filterProcessor{
cfg: cfg,
includeSpans: inc,
excludeSpans: exc,
logger: logger,
}, nil
}

func createMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, error) {
func createSpansMatcher(mp *filterconfig.MatchProperties) (filterspan.Matcher, error) {
if mp == nil {
return nil, nil
}
return filterspan.NewMatcher(mp)
}

func createMetricsMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, error) {
// Nothing specified in configuration
if mp == nil {
return nil, nil
}
return filtermetric.NewMatcher(mp)
}

// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
// ProcessMetrics filters the given metrics based off the filterProcessor's filters.
func (fmp *filterProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
rms := pdm.ResourceMetrics()
idx := newMetricIndex()
for i := 0; i < rms.Len(); i++ {
Expand All @@ -112,9 +171,34 @@ func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Me
return idx.extract(pdm), nil
}

func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) (bool, error) {
if fmp.include != nil {
matches, err := fmp.include.MatchMetric(metric)
// ProcessTraces filters the given spans based off the filterProcessor's filters.
func (fmp *filterProcessor) ProcessTraces(_ context.Context, pdt pdata.Traces) (pdata.Traces, error) {
rs := pdt.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
rss := rs.At(i)
resource := rss.Resource()
ils := rss.InstrumentationLibrarySpans()

for j := 0; j < ils.Len(); j++ {
ilss := ils.At(j)
library := ilss.InstrumentationLibrary()
inputSpans := pdata.NewSpanSlice()
ilss.Spans().MoveAndAppendTo(inputSpans)
for k := 0; k < inputSpans.Len(); k++ {
span := inputSpans.At(k)
if fmp.shouldKeepSpan(span, resource, library) {
ilss.Spans().Append(span)
}
}
}
}

return pdt, nil
}

func (fmp *filterProcessor) shouldKeepMetric(metric pdata.Metric) (bool, error) {
if fmp.includeMetrics != nil {
matches, err := fmp.includeMetrics.MatchMetric(metric)
if err != nil {
// default to keep if there's an error
return true, err
Expand All @@ -124,8 +208,8 @@ func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) (bool, e
}
}

if fmp.exclude != nil {
matches, err := fmp.exclude.MatchMetric(metric)
if fmp.excludeMetrics != nil {
matches, err := fmp.excludeMetrics.MatchMetric(metric)
if err != nil {
return true, err
}
Expand All @@ -136,3 +220,21 @@ func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) (bool, e

return true, nil
}

func (fmp *filterProcessor) shouldKeepSpan(span pdata.Span, resource pdata.Resource, library pdata.InstrumentationLibrary) bool {
if fmp.includeSpans != nil {
matches := fmp.includeSpans.MatchSpan(span, resource, library)
if !matches {
return false
}
}

if fmp.excludeSpans != nil {
matches := fmp.excludeSpans.MatchSpan(span, resource, library)
if matches {
return false
}
}

return true
}
Loading

0 comments on commit 6795013

Please sign in to comment.