Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filters support to generic forwarding #2742

Merged
merged 8 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [ENHANCEMENT] Assert ingestion rate limits as early as possible [#2640](https://github.com/grafana/tempo/pull/2703) (@mghildiy)
* [ENHANCEMENT] Add several metrics-generator fields to user-configurable overrides [#2711](https://github.com/grafana/tempo/pull/2711) (@kvrhdn)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [FEATURE] Add filtering support to Generic Forwarding [#2742](https://github.com/grafana/tempo/pull/2742) (@Blinkuu)
yvrhdn marked this conversation as resolved.
Show resolved Hide resolved

## v2.2.0 / 2023-07-31

Expand Down
9 changes: 9 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ distributor:
# Optional.
# Path to the TLS certificate. This field must be set if insecure = false.
[cert_file: <string | default = "">]

# Optional.
# Configures filtering in forwarder that lets you drop spans and span events using
# the OpenTelemetry Transformation Language (OTTL) syntax. For detailed overview of
# the OTTL syntax, please refer to the official Open Telemetry documentation.
filter:
traces:
span: <list of string>
spanevent: <list of string>
- (repetition of above...)


Expand Down
4 changes: 1 addition & 3 deletions docs/sources/tempo/operations/generic_forwarding.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ weight: 130

# Generic forwarding

Generic forwarding allows asynchronous replication of ingested traces.
The distributor writes received spans to both the ingester and defined endpoints, if enabled.
This feature works in a "best-effort" manner, meaning that no retries happen if an error occurs during replication.
Generic forwarding allows asynchronous replication of ingested traces. The distributor writes received spans to both the ingester and defined endpoints, if enabled. This feature works in a "best-effort" manner, meaning that no retries happen if an error occurs during replication.

>**Warning:** Generic forwarding does not work retroactively. Once enabled, the distributor only replicates freshly ingested spans.
Blinkuu marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ require (
github.com/googleapis/gax-go/v2 v2.7.0
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.74.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.74.0
github.com/parquet-go/parquet-go v0.17.0
github.com/stoewer/parquet-cli v0.0.5
go.opentelemetry.io/collector/exporter v0.74.0
Expand All @@ -125,9 +127,11 @@ require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/alecthomas/participle/v2 v2.0.0-beta.5 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/antonmedv/expr v1.12.3 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/armon/go-metrics v0.4.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
Expand Down Expand Up @@ -159,6 +163,7 @@ require (
github.com/go-openapi/strfmt v0.21.3 // indirect
github.com/go-openapi/swag v0.22.1 // indirect
github.com/go-openapi/validate v0.22.0 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/google/btree v1.0.1 // indirect
Expand All @@ -180,6 +185,7 @@ require (
github.com/hashicorp/memberlist v0.3.1 // indirect
github.com/hashicorp/serf v0.9.8 // indirect
github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
Expand Down Expand Up @@ -215,7 +221,9 @@ require (
github.com/oklog/ulid v1.3.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.74.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.74.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.74.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.74.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.74.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.74.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.74.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20220512140940-7b36cea86235 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2ojoH/0=
github.com/alecthomas/kong v0.8.0 h1:ryDCzutfIqJPnNn0omnrgHLbAggDQM2VWHikE1xqK7s=
github.com/alecthomas/kong v0.8.0/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U=
github.com/alecthomas/participle/v2 v2.0.0-beta.5 h1:y6dsSYVb1G5eK6mgmy+BgI3Mw35a3WghArZ/Hbebrjo=
github.com/alecthomas/participle/v2 v2.0.0-beta.5/go.mod h1:RC764t6n4L8D8ITAJv0qdokritYSNR3wV5cVwmIEaMM=
github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand All @@ -458,6 +460,8 @@ github.com/alicebob/miniredis/v2 v2.21.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antonmedv/expr v1.12.3 h1:bQwNFbmpIXKY/v4ZKuA4nPGuvuBVd9/zKiGS5ZsPePI=
github.com/antonmedv/expr v1.12.3/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU=
github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg=
github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down Expand Up @@ -691,6 +695,8 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe
github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/goccy/go-yaml v1.9.5/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down Expand Up @@ -918,6 +924,7 @@ github.com/hetznercloud/hcloud-go v1.35.3 h1:WCmFAhLRooih2QHAsbCbEdpIHnshQQmrPqs
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs=
github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E=
github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -1105,16 +1112,23 @@ github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporte
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.74.0 h1:SUlZZPqhPs0FUtq8ck07P2jUnxiNY6iGdqoKVSVSoOU=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.74.0 h1:vU5ZebauzCuYNXFlQaWaYnOfjoOAnS+Sc8+oNWoHkbM=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.74.0/go.mod h1:TEu3TnUv1TuyHtjllrUDQ/ImpyD+GrkDejZv4hxl3G8=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.74.0 h1:MnN05OtFcx37fUILRRFji5zYj9F3PNQN2lvdMqEqlx4=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.74.0/go.mod h1:27sbUKPQtoOh+gS25dWXx9x54axhqV5R7EN/0TOO0NQ=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.74.0 h1:COFBWXiWnhRs9x1oYJbDg5cyiNAozp8sycriD9+1/7E=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.74.0/go.mod h1:cAKlYKU+/8mk6ETOnD+EAi5gpXZjDrGweAB9YTYrv/g=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.74.0 h1:HUDDdk1cjAYu4unzCq3PC8j+zuOsBIkLMFXjuOW975Q=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.74.0/go.mod h1:ste/ffn1fzB4Iaf/wIMsIvyFzIUkc/Rn6/f/Cf8FAoE=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.74.0 h1:DmOc+i5N1Ut23tJnHJUIcne5JWnYh6x2VL7YG4PP+tg=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.74.0 h1:9zWdiR9+bem0LvvWWoMZU6R3xTmu0WbcAPe8kI/jpyk=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.74.0/go.mod h1:3RViz8fguswWAFR+8W2Kzmch3eecOVK935QVsBdpUk4=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.74.0 h1:ww1pPXfAM0WHsymQnsN+s4B9DgwQC+GyoBq0t27JV/k=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.74.0/go.mod h1:OpEw7tyCg+iG1ywEgZ03qe5sP/8fhYdtWCMoqA8JCug=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.74.0 h1:0Fh6OjlUB9HlnX90/gGiyyFvnmNBv6inj7bSaVqQ7UQ=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.74.0/go.mod h1:13ekplz1UmvK99Vz2VjSBWPYqoRBEax5LPmA1tFHnhA=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.74.0 h1:A5xoBaMHX1WzLfvlqK6NBXq4XIbuSVJIpec5r6PDE7U=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.74.0/go.mod h1:TJT7HkhFPrJic30Vk4seF/eRk8sa0VQ442Xq/qd+DLY=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.74.0 h1:11kk61GJqLQ9lsveDUBfvJ3aN/Eq3nkDZoE7fzjztDY=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.74.0/go.mod h1:Yt0Ve0tOdKE30Qu6pHGRXYn3FgCKlqaY/lrbDb2j/+8=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.74.0 h1:pWNSPCKD+V4rC+MnZj8uErEbcsYUpEqU3InNYyafAPY=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.74.0/go.mod h1:0lXcDf6LUbtDxZZO3zDbRzMuL7gL1Q0FPOR8/3IBwaQ=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.74.0 h1:NWd9+rQTd6pELLf3copo7CEuNgKp90kgyhPozpwax2U=
Expand Down
2 changes: 1 addition & 1 deletion modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
d.generatorForwarder = newGeneratorForwarder(logger, d.sendToGenerators, o)
subservices = append(subservices, d.generatorForwarder)

forwardersManager, err := forwarder.NewManager(d.cfg.Forwarders, logger, o)
forwardersManager, err := forwarder.NewManager(d.cfg.Forwarders, logger, o, loggingLevel)
if err != nil {
return nil, fmt.Errorf("failed to create forwarders manager: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions modules/distributor/forwarder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ type Config struct {
Name string `yaml:"name"`
Backend string `yaml:"backend"`
OTLPGRPC otlpgrpc.Config `yaml:"otlpgrpc"`
Filter FilterConfig `yaml:"filter"`
}

type FilterConfig struct {
Traces TraceFiltersConfig `yaml:"traces"`
}

type TraceFiltersConfig struct {
SpanConditions []string `yaml:"span"`
SpanEventConditions []string `yaml:"spanevent"`
}

func (cfg *Config) Validate() error {
Expand Down
173 changes: 171 additions & 2 deletions modules/distributor/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,26 @@ package forwarder
import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/go-kit/log"
zaplogfmt "github.com/jsternberg/zap-logfmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
"github.com/sirupsen/logrus"
"github.com/weaveworks/common/logging"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/grafana/tempo/modules/distributor/forwarder/otlpgrpc"
)
Expand All @@ -31,11 +46,12 @@ func (l List) ForwardTraces(ctx context.Context, traces ptrace.Traces) error {
return multierr.Combine(errs...)
}

func New(cfg Config, logger log.Logger) (Forwarder, error) {
func New(cfg Config, logger log.Logger, logLevel logging.Level) (Forwarder, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("failed to validate config: %w", err)
}

var forwarder Forwarder
switch cfg.Backend {
case OTLPGRPCBackend:
f, err := otlpgrpc.NewForwarder(cfg.OTLPGRPC, logger)
Expand All @@ -49,8 +65,161 @@ func New(cfg Config, logger log.Logger) (Forwarder, error) {
return nil, fmt.Errorf("failed to dial: %w", err)
}

return f, nil
forwarder = f
default:
return nil, fmt.Errorf("%s backend is not supported", cfg.Backend)
}

if len(cfg.Filter.Traces.SpanConditions) > 0 || len(cfg.Filter.Traces.SpanEventConditions) > 0 {
return NewFilterForwarder(cfg.Filter, forwarder, logLevel)
}

return forwarder, nil
}

type FilterForwarder struct {
filterProcessor processor.Traces
next Forwarder
fatalError error
fatalErrorMu sync.RWMutex
}

func NewFilterForwarder(cfg FilterConfig, next Forwarder, logLevel logging.Level) (*FilterForwarder, error) {
factory := filterprocessor.NewFactory()

set := processor.CreateSettings{
ID: component.ID{},
TelemetrySettings: component.TelemetrySettings{
Logger: newLogger(logLevel),
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
},
BuildInfo: component.BuildInfo{},
}
fpCfg := &filterprocessor.Config{
ErrorMode: ottl.IgnoreError,
Traces: filterprocessor.TraceFilters{
SpanConditions: cfg.Traces.SpanConditions,
SpanEventConditions: cfg.Traces.SpanEventConditions,
},
}
fp, err := factory.CreateTracesProcessor(context.Background(), set, fpCfg, consumerToForwarderAdapter{forwarder: next})
if err != nil {
return nil, fmt.Errorf("failed to create filter processor: %w", err)
}

f := &FilterForwarder{
filterProcessor: fp,
next: next,
fatalError: nil,
fatalErrorMu: sync.RWMutex{},
}

if err := f.filterProcessor.Start(context.TODO(), f); err != nil {
return nil, fmt.Errorf("failed to start filter processor: %w", err)
}

return f, nil
}

func (f *FilterForwarder) ForwardTraces(ctx context.Context, traces ptrace.Traces) error {
f.fatalErrorMu.RLock()
fatalErr := f.fatalError
f.fatalErrorMu.RUnlock()

if fatalErr != nil {
return fmt.Errorf("fatal error occurred in filter forwarder: %w", fatalErr)
}

// Copying the traces to avoid mutating the original.
tracesCopy := ptrace.NewTraces()
traces.CopyTo(tracesCopy)

err := f.filterProcessor.ConsumeTraces(ctx, tracesCopy)
if err != nil {
return fmt.Errorf("failed to filter traces: %w", err)
}

return nil
}

func (f *FilterForwarder) Shutdown(ctx context.Context) error {
var errs []error

if err := f.filterProcessor.Shutdown(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown filter processor: %w", err))
}

if err := f.next.Shutdown(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown next forwarder: %w", err))
}

return multierr.Combine(errs...)
}

// ReportFatalError implements component.Host
func (f *FilterForwarder) ReportFatalError(err error) {
f.fatalErrorMu.Lock()
f.fatalError = err
f.fatalErrorMu.Unlock()
}

// GetFactory implements component.Host
func (f *FilterForwarder) GetFactory(component.Kind, component.Type) component.Factory {
return nil
}

// GetExtensions implements component.Host
func (f *FilterForwarder) GetExtensions() map[component.ID]extension.Extension {
return nil
}

// GetExporters implements component.Host
func (f *FilterForwarder) GetExporters() map[component.DataType]map[component.ID]component.Component {
return nil
}

type consumerToForwarderAdapter struct {
forwarder Forwarder
}

func (c consumerToForwarderAdapter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return c.forwarder.ForwardTraces(ctx, td)
}

func (c consumerToForwarderAdapter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func newLogger(level logging.Level) *zap.Logger {
zapLevel := zapcore.InfoLevel

switch level.Logrus {
case logrus.PanicLevel:
zapLevel = zapcore.PanicLevel
case logrus.FatalLevel:
zapLevel = zapcore.FatalLevel
case logrus.ErrorLevel:
zapLevel = zapcore.ErrorLevel
case logrus.WarnLevel:
zapLevel = zapcore.WarnLevel
case logrus.InfoLevel:
zapLevel = zapcore.InfoLevel
case logrus.TraceLevel, logrus.DebugLevel:
zapLevel = zapcore.DebugLevel
}

config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(time.RFC3339))
}
logger := zap.New(zapcore.NewCore(
zaplogfmt.NewEncoder(config),
os.Stdout,
zapLevel,
))
logger = logger.With(zap.String("component", "tempo"))
logger.Info("filter forwarder logger initialized")

return logger
}
Loading