From 4766920c9f1e6b77b133b33a0568147e9a8c1123 Mon Sep 17 00:00:00 2001 From: Annie Fu Date: Wed, 5 Aug 2020 01:43:20 -0700 Subject: [PATCH] If filterprocessor filters all data, stop further processing --- processor/README.md | 4 +++- processor/filterprocessor/filter_processor.go | 9 +++++++- .../filterprocessor/filter_processor_test.go | 22 ++++++++++++------- processor/processorhelper/processor.go | 8 +++++++ processor/processorhelper/processor_test.go | 6 +++++ 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/processor/README.md b/processor/README.md index fa35af11039..74ef433c7d7 100644 --- a/processor/README.md +++ b/processor/README.md @@ -134,6 +134,8 @@ are checked before the `exclude` properties. ```yaml filter: + # metrics indicates this processor applies to metrics + metrics: # include and/or exclude can be specified. However, the include properties # are always checked before the exclude properties. {include, exclude}: @@ -148,7 +150,7 @@ filter: # metric_names specify an array of items to match the metric name against. # This is a required field. - metric_name: [, ..., ] + metric_names: [, ..., ] ``` #### Match Configuration diff --git a/processor/filterprocessor/filter_processor.go b/processor/filterprocessor/filter_processor.go index 6a079ed772b..488f45ff830 100644 --- a/processor/filterprocessor/filter_processor.go +++ b/processor/filterprocessor/filter_processor.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/processor/filtermetric" + "go.opentelemetry.io/collector/processor/processorhelper" ) type filterMetricProcessor struct { @@ -62,9 +63,10 @@ func createMatcher(mp *filtermetric.MatchProperties) (*filtermetric.Matcher, err return &matcher, nil } -// ProcessMetrics filters the given spans based off the filterMetricProcessor's filters. +// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters. func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { mds := pdatautil.MetricsToMetricsData(md) + foundMetricToKeep := false for i := range mds { if len(mds[i].Metrics) == 0 { continue @@ -72,11 +74,16 @@ func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Met keep := make([]*metricspb.Metric, 0, len(mds[i].Metrics)) for _, m := range mds[i].Metrics { if fmp.shouldKeepMetric(m) { + foundMetricToKeep = true keep = append(keep, m) } } mds[i].Metrics = keep } + + if !foundMetricToKeep { + return md, processorhelper.ErrSkipProcessingData + } return pdatautil.MetricsFromMetricsData(mds), nil } diff --git a/processor/filterprocessor/filter_processor_test.go b/processor/filterprocessor/filter_processor_test.go index 451062826d1..db6c726d888 100644 --- a/processor/filterprocessor/filter_processor_test.go +++ b/processor/filterprocessor/filter_processor_test.go @@ -32,11 +32,12 @@ import ( ) type metricNameTest struct { - name string - inc *filtermetric.MatchProperties - exc *filtermetric.MatchProperties - inMN [][]*metricspb.Metric // input Metric batches - outMN [][]string // output Metric names + name string + inc *filtermetric.MatchProperties + exc *filtermetric.MatchProperties + inMN [][]*metricspb.Metric // input Metric batches + outMN [][]string // output Metric names + allMetricsFiltered bool } var ( @@ -169,8 +170,8 @@ var ( MatchType: filterset.Strict, }, }, - inMN: [][]*metricspb.Metric{metricsWithName(inMetricNames)}, - outMN: [][]string{{}}, + inMN: [][]*metricspb.Metric{metricsWithName(inMetricNames)}, + allMetricsFiltered: true, }, { name: "emptyFilterExclude", @@ -220,8 +221,13 @@ func TestFilterMetricProcessor(t *testing.T) { context.Background(), pdatautil.MetricsFromMetricsData(mds)) assert.Nil(t, cErr) - got := next.AllMetrics() + + if test.allMetricsFiltered { + require.Equal(t, 0, len(got)) + return + } + require.Equal(t, 1, len(got)) gotMD := pdatautil.MetricsToMetricsData(got[0]) require.Equal(t, len(test.outMN), len(gotMD)) diff --git a/processor/processorhelper/processor.go b/processor/processorhelper/processor.go index d5dbbc4abe5..7db86053b0c 100644 --- a/processor/processorhelper/processor.go +++ b/processor/processorhelper/processor.go @@ -26,6 +26,11 @@ import ( "go.opentelemetry.io/collector/obsreport" ) +// ErrSkipProcessingData is a sentinel value to indicate when traces or metrics should intentionally be dropped +// from further processing in the pipeline because the data is determined to be irrelevant. A processor can return this error +// to stop further processing without propagating an error back up the pipeline to logs. +var ErrSkipProcessingData = errors.New("sentinel error to skip processing data from the remainder of the pipeline") + // Start specifies the function invoked when the processor is being started. type Start func(context.Context, component.Host) error @@ -172,6 +177,9 @@ func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics var err error md, err = mp.processor.ProcessMetrics(processorCtx, md) if err != nil { + if err == ErrSkipProcessingData { + return nil + } return err } return mp.nextConsumer.ConsumeMetrics(ctx, md) diff --git a/processor/processorhelper/processor_test.go b/processor/processorhelper/processor_test.go index 7bdf4f0ea9b..54f7120113c 100644 --- a/processor/processorhelper/processor_test.go +++ b/processor/processorhelper/processor_test.go @@ -129,6 +129,12 @@ func TestNewMetricsExporter_ProcessMetricsError(t *testing.T) { assert.Equal(t, want, me.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()))) } +func TestNewMetricsExporter_ProcessMetricsErrSkipProcessingData(t *testing.T) { + me, err := NewMetricsProcessor(testCfg, exportertest.NewNopMetricsExporter(), newTestMProcessor(ErrSkipProcessingData)) + require.NoError(t, err) + assert.Equal(t, nil, me.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()))) +} + func TestNewLogsExporter(t *testing.T) { me, err := NewLogsProcessor(testCfg, exportertest.NewNopLogsExporter(), newTestLProcessor(nil)) require.NoError(t, err)