Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend distributor trace logger with optional features to include span attributes and filter by error status #1465

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [FEATURE] Extend distributor trace logger with optional features to include span attributes and filter by error status. [#1465](https://github.com/grafana/tempo/pull/1465) (@faustodavid)
faustodavid marked this conversation as resolved.
Show resolved Hide resolved
* [CHANGE] metrics-generator: Changed added metric label `instance` to `__metrics_gen_instance` to reduce collisions with custom dimensions. [#1439](https://github.com/grafana/tempo/pull/1439) (@joe-elliott)
* [CHANGE] Don't enforce `max_bytes_per_tag_values_query` when set to 0. [#1447](https://github.com/grafana/tempo/pull/1447) (@joe-elliott)
* [FEATURE] metrics-generator: support per-tenant processor configuration [#1434](https://github.com/grafana/tempo/pull/1434) (@kvrhdn)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (t *App) initOverrides() (services.Service, error) {

func (t *App) initDistributor() (services.Service, error) {
// todo: make ingester client a module instead of passing the config everywhere
distributor, err := distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.cfg.GeneratorClient, t.generatorRing, t.overrides, t.TracesConsumerMiddleware, t.cfg.Server.LogLevel, t.cfg.SearchEnabled, t.cfg.MetricsGeneratorEnabled, prometheus.DefaultRegisterer)
distributor, err := distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.cfg.GeneratorClient, t.generatorRing, t.overrides, t.TracesConsumerMiddleware, log.Logger, t.cfg.Server.LogLevel, t.cfg.SearchEnabled, t.cfg.MetricsGeneratorEnabled, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create distributor %w", err)
}
Expand Down
10 changes: 9 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,15 @@ distributor:

# Optional.
# Enable to log every received trace id to help debug ingestion
[log_received_traces: <bool>]
# WARNING: Deprecated. Use log_received_spans instead.
[log_received_traces: <boolean> | default = false]

# Optional.
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs
log_received_spans:
[enabled: <boolean> | default = false]
[include_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
faustodavid marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
3 changes: 2 additions & 1 deletion docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ distributor:
instance_addr: ""
receivers: {}
override_ring_key: distributor
log_received_traces: false
log_received_spans:
enabled: false
extend_writes: true
search_tags_deny_list: []
ingester_client:
Expand Down
3 changes: 2 additions & 1 deletion integration/e2e/config-all-in-one-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ distributor:
protocols:
grpc:
zipkin:
log_received_traces: true
log_received_spans:
enabled: true

ingester:
lifecycler:
Expand Down
3 changes: 2 additions & 1 deletion integration/e2e/config-metrics-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ distributor:
jaeger:
protocols:
grpc:
log_received_traces: true
log_received_spans:
enabled: true

ingester:
lifecycler:
Expand Down
15 changes: 13 additions & 2 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/grafana/dskit/flagext"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/tempo/pkg/util"
)

var defaultReceivers = map[string]interface{}{
Expand All @@ -31,7 +32,8 @@ type Config struct {
// otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
Receivers map[string]interface{} `yaml:"receivers"`
OverrideRingKey string `yaml:"override_ring_key"`
LogReceivedTraces bool `yaml:"log_received_traces"`
LogReceivedTraces bool `yaml:"log_received_traces"` // Deprecated
LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"`

// disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
// note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
Expand All @@ -43,6 +45,12 @@ type Config struct {
factory func(addr string) (ring_client.PoolClient, error) `yaml:"-"`
}

type LogReceivedSpansConfig struct {
Enabled bool `yaml:"enabled"`
IncludeAttributes bool `yaml:"include_attributes"`
faustodavid marked this conversation as resolved.
Show resolved Hide resolved
FilterByStatusError bool `yaml:"filter_by_status_error"`
}

// RegisterFlagsAndApplyDefaults registers flags and applies defaults
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
flagext.DefaultValues(&cfg.DistributorRing)
Expand All @@ -52,5 +60,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = distributorRingKey
cfg.ExtendWrites = true

f.BoolVar(&cfg.LogReceivedTraces, prefix+".log-received-traces", false, "Enable to log every received trace id to help debug ingestion.")
f.BoolVar(&cfg.LogReceivedTraces, util.PrefixConfig(prefix, "log-received-traces"), false, "Enable to log every received trace id to help debug ingestion.")
f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAttributes, util.PrefixConfig(prefix, " log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
faustodavid marked this conversation as resolved.
Show resolved Hide resolved
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
}
72 changes: 62 additions & 10 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/util/strutil"
"github.com/segmentio/fasthash/fnv1a"

"github.com/pkg/errors"
Expand All @@ -34,7 +36,7 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/log"
tempo_util "github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/validation"
)

Expand Down Expand Up @@ -137,10 +139,12 @@ type Distributor struct {
// Manager for subservices
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

logger log.Logger
}

// New a distributor creates.
func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRing, generatorClientCfg generator_client.Config, generatorsRing ring.ReadRing, o *overrides.Overrides, middleware receiver.Middleware, loggingLevel logging.Level, searchEnabled bool, metricsGeneratorEnabled bool, reg prometheus.Registerer) (*Distributor, error) {
func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRing, generatorClientCfg generator_client.Config, generatorsRing ring.ReadRing, o *overrides.Overrides, middleware receiver.Middleware, logger log.Logger, loggingLevel logging.Level, searchEnabled bool, metricsGeneratorEnabled bool, reg prometheus.Registerer) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
Expand All @@ -156,14 +160,14 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi

if o.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy {
lifecyclerCfg := cfg.DistributorRing.ToLifecyclerConfig()
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, nil, "distributor", cfg.OverrideRingKey, false, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, nil, "distributor", cfg.OverrideRingKey, false, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, err
}
subservices = append(subservices, lifecycler)
ingestionRateStrategy = newGlobalIngestionRateStrategy(o, lifecycler)

ring, err := ring.New(lifecyclerCfg.RingConfig, "distributor", cfg.OverrideRingKey, log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
ring, err := ring.New(lifecyclerCfg.RingConfig, "distributor", cfg.OverrideRingKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
return nil, errors.Wrap(err, "unable to initialize distributor ring")
}
Expand All @@ -178,7 +182,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
ring_client.NewRingServiceDiscovery(ingestersRing),
factory,
metricIngesterClients,
log.Logger)
logger)

subservices = append(subservices, pool)

Expand All @@ -202,6 +206,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
globalTagsToDrop: tagsToDrop,
overrides: o,
traceEncoder: model.MustNewSegmentDecoder(model.CurrentEncoding),
logger: logger,
}

if metricsGeneratorEnabled {
Expand All @@ -213,7 +218,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
return generator_client.New(addr, generatorClientCfg)
},
metricGeneratorClients,
log.Logger,
logger,
)

subservices = append(subservices, d.generatorsPool)
Expand Down Expand Up @@ -279,8 +284,12 @@ func (d *Distributor) PushBatches(ctx context.Context, batches []*v1.ResourceSpa
return nil, err
}

if d.cfg.LogReceivedTraces {
logTraces(batches)
if d.cfg.LogReceivedSpans.Enabled || d.cfg.LogReceivedTraces {
if d.cfg.LogReceivedSpans.IncludeAttributes {
logSpansWithAttributes(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
} else {
logSpans(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
}
}

// metric size
Expand Down Expand Up @@ -531,11 +540,54 @@ func recordDiscaredSpans(err error, userID string, spanCount int) {
}
}

func logTraces(batches []*v1.ResourceSpans) {
func logSpans(batches []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, b := range batches {
for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
level.Info(log.Logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
continue
}
level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}
}
}
}

func logSpansWithAttributes(batch []*v1.ResourceSpans, filterByStatusError bool, logger log.Logger) {
for _, b := range batch {
logger := logger // copy logger so we can modify it per resource
faustodavid marked this conversation as resolved.
Show resolved Hide resolved

// extract resources attributes, so we only have to do it once per batch of spans
for _, a := range b.Resource.GetAttributes() {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}

for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {
if filterByStatusError && s.Status.Code != v1.Status_STATUS_CODE_ERROR {
continue
}

logger := logger // copy logger so we can modify it per span

for _, a := range s.GetAttributes() {
logger = log.With(
logger,
"span_"+strutil.SanitizeLabelName(a.GetKey()),
tempo_util.StringifyAnyValue(a.GetValue()))
}

latencySeconds := float64(s.GetEndTimeUnixNano()-s.GetStartTimeUnixNano()) / float64(time.Second.Nanoseconds())
logger = log.With(
logger,
"span_duration_seconds", latencySeconds,
"span_kind", s.GetKind().String(),
"span_status", s.GetStatus().GetCode().String())

level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}
}
}
Expand Down
Loading