Skip to content

Commit

Permalink
Feature: Distributor usage trackers (#4162)
Browse files Browse the repository at this point in the history
* First working draft of cost attribution usage tracker

* Add missing tracker name label, more efficient batch proportioning, cleanup

* Reduce series hashing

* Fix user-configurable overrides tests for new json element

* lint

* Add per-tenant override for max cardinality

* lint, review feedback

* Default to not enabled, cleanup test config

* Explicitly check for usage_metrics handler

* review feedback

* Update tracker to support many-to-one mapping with relabel

* lint

* New behavior for missing and overflow

* Fix issue where subsequent spans would incorrectly reuse the series of the previous span if they were missing values

* Revert maps back to slices now that we can depend on a dimension always having a value

* Please ignore benchmark profiles

* Tweak config to have specific cost attribution tracker section. Update manifest and config index

* lint

* changelog

* Update api docs for new endpoint

* Review feedback

* review feedback

* Swap loop order for a tad more performance
  • Loading branch information
mdisibio authored Oct 25, 2024
1 parent 094657a commit b0f06ce
Show file tree
Hide file tree
Showing 19 changed files with 903 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.idea
.vscode
*.test
*.out
*.pprof
/bin
/cmd/tempo-cli/tempo-cli
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## main / unreleased

* [ENHANCEMENT] The span multiplier now also sources its value from the resource attributes. [#4210](https://github.com/grafana/tempo/pull/4210)
* [FEATURE] Export cost attribution usage metrics from distributor [#4162](https://github.com/grafana/tempo/pull/4162) (@mdisibio)
* [ENHANCEMENT] Changed log level from INFO to DEBUG for the TempoDB Find operation using traceId to reduce excessive/unwanted logs in log search. [#4179](https://github.com/grafana/tempo/pull/4179) (@Aki0x137)
* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero)
* [CHANGE] Add throughput and SLO metrics in the tags and tag values endpoints [#4148](https://github.com/grafana/tempo/pull/4148) (@electron0zero)
* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero)
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
Expand Down
4 changes: 4 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ func (t *App) initDistributor() (services.Service, error) {
t.Server.HTTPRouter().Handle("/distributor/ring", distributor.DistributorRing)
}

if usageHandler := distributor.UsageTrackerHandler(); usageHandler != nil {
t.Server.HTTPRouter().Handle("/usage_metrics", usageHandler)
}

return t.distributor, nil
}

Expand Down
25 changes: 25 additions & 0 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ For externally supported GRPC API, [see below](#tempo-grpc-api).
| Memberlist | Distributor, Ingester, Querier, Compactor | HTTP | `GET /memberlist` |
| [Flush](#flush) | Ingester | HTTP | `GET,POST /flush` |
| [Shutdown](#shutdown) | Ingester | HTTP | `GET,POST /shutdown` |
| [Usage Metrics](#usage-metrics) | Distributor | HTTP | `GET /usage_metrics` |
| [Distributor ring status](#distributor-ring-status) (*) | Distributor | HTTP | `GET /distributor/ring` |
| [Ingesters ring status](#ingesters-ring-status) | Distributor, Querier | HTTP | `GET /ingester/ring` |
| [Metrics-generator ring status](#metrics-generator-ring-status) (*) | Distributor | HTTP | `GET /metrics-generator/ring` |
Expand Down Expand Up @@ -684,6 +685,30 @@ ingester service.
This is usually used at the time of scaling down a cluster.
{{% /admonition %}}
### Usage metrics
{{< admonition type="note" >}}
This endpoint is only available when one or more usage trackers are enabled in [the distributor]({{< relref "../configuration#distributor" >}}).
{{% /admonition %}}
```
GET /usage_metrics
```
Special metrics scrape endpoint that provides per-tenant metrics on ingested data. Per-tenant grouping rules are configured in [the per-tenant overrides]({{< relref "../configuration#overrides" >}})
Example:
```
curl http://localhost:3200/usage_metrics
# HELP tempo_usage_tracker_bytes_received_total bytes total received with these attributes
# TYPE tempo_usage_tracker_bytes_received_total counter
tempo_usage_tracker_bytes_received_total{service="auth-service",tenant="single-tenant",tracker="cost-attribution"} 96563
tempo_usage_tracker_bytes_received_total{service="cache",tenant="single-tenant",tracker="cost-attribution"} 81904
tempo_usage_tracker_bytes_received_total{service="gateway",tenant="single-tenant",tracker="cost-attribution"} 164751
tempo_usage_tracker_bytes_received_total{service="identity-service",tenant="single-tenant",tracker="cost-attribution"} 85974
tempo_usage_tracker_bytes_received_total{service="service-A",tenant="single-tenant",tracker="cost-attribution"} 92799
```
### Distributor ring status
{{< admonition type="note" >}}
Expand Down
22 changes: 21 additions & 1 deletion docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,19 @@ distributor:
# defaults to 0 which means that by default ResourceExhausted is not retried. Set this to a duration such as `1s` to
# instruct the client how to retry.
[retry_after_on_resource_exhausted: <duration> | default = '0' ]

# Optional.
# Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable
# attributes exposed on /usage_metrics.
usage:
cost_attribution:
# Enables the "cost-attribution" usage tracker. Per-tenant attributes are configured in overrides.
[enabled: <boolean> | default = false]
# Maximum number of series per tenant.
[max_cardinality: <int> | default = 10000]
# Interval after which a series is considered stale and will be deleted from the registry.
# Once a metrics series is deleted, it won't be emitted anymore, keeping active series low.
[stale_duration: <duration> | default = 15m0s]
```
## Ingester
Expand Down Expand Up @@ -472,7 +485,7 @@ metrics_generator:
[collection_interval: <duration> | default = 15s]

# Interval after which a series is considered stale and will be deleted from the registry.
# Once a metrics series is deleted it won't be emitted anymore, keeping active series low.
# Once a metrics series is deleted, it won't be emitted anymore, keeping active series low.
[stale_duration: <duration> | default = 15m]

# A list of labels that will be added to all generated metrics.
Expand Down Expand Up @@ -1719,6 +1732,13 @@ overrides:
scope: <string> # scope of the attribute. options: resource, span
]
# Cost attribution usage tracker configuration
cost_attribution:
# List of attributes to group ingested data by. Map value is optional. Can be used to rename and
# combine attributes.
dimensions: <map string to string>
# Tenant-specific overrides settings configuration file. The empty string (default
# value) disables using an overrides file.
[per_tenant_override_config: <string> | default = ""]
Expand Down
7 changes: 6 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ go run ./cmd/tempo --storage.trace.backend=local --storage.trace.local.path=/var
## Complete configuration

{{< admonition type="note" >}}
This manifest was generated on 2024-10-11.
This manifest was generated on 2024-10-21.
{{% /admonition %}}

```yaml
Expand Down Expand Up @@ -186,6 +186,11 @@ distributor:
receivers: {}
override_ring_key: distributor
forwarders: []
usage:
cost_attribution:
enabled: false
max_cardinality: 10000
stale_duration: 15m0s
extend_writes: true
retry_after_on_resource_exhausted: 0s
ingester_client:
Expand Down
7 changes: 5 additions & 2 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ring_client "github.com/grafana/dskit/ring/client"

"github.com/grafana/tempo/modules/distributor/forwarder"
"github.com/grafana/tempo/modules/distributor/usage"
"github.com/grafana/tempo/pkg/util"
)

Expand Down Expand Up @@ -37,8 +38,8 @@ type Config struct {
LogReceivedSpans LogSpansConfig `yaml:"log_received_spans,omitempty"`
LogDiscardedSpans LogSpansConfig `yaml:"log_discarded_spans,omitempty"`
MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"`

Forwarders forwarder.ConfigList `yaml:"forwarders"`
Forwarders forwarder.ConfigList `yaml:"forwarders"`
Usage usage.Config `yaml:"usage,omitempty"`

// disables write extension with inactive ingesters. Use this along with ingester.lifecycler.unregister_on_shutdown = true
// note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
Expand Down Expand Up @@ -80,4 +81,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.BoolVar(&cfg.LogDiscardedSpans.Enabled, util.PrefixConfig(prefix, "log-discarded-spans.enabled"), false, "Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogDiscardedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-discarded-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogDiscardedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-discarded-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")

cfg.Usage.RegisterFlagsAndApplyDefaults(prefix, f)
}
26 changes: 26 additions & 0 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"math"
"net/http"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ import (

"github.com/grafana/tempo/modules/distributor/forwarder"
"github.com/grafana/tempo/modules/distributor/receiver"
"github.com/grafana/tempo/modules/distributor/usage"
generator_client "github.com/grafana/tempo/modules/generator/client"
ingester_client "github.com/grafana/tempo/modules/ingester/client"
"github.com/grafana/tempo/modules/overrides"
Expand Down Expand Up @@ -154,6 +156,8 @@ type Distributor struct {
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

usage *usage.Tracker

logger log.Logger
}

Expand Down Expand Up @@ -214,6 +218,14 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
logger: logger,
}

if cfg.Usage.CostAttribution.Enabled {
usage, err := usage.NewTracker(cfg.Usage.CostAttribution, "cost-attribution", o.CostAttributionDimensions, o.CostAttributionMaxCardinality)
if err != nil {
return nil, fmt.Errorf("creating usage tracker: %w", err)
}
d.usage = usage
}

var generatorsPoolFactory ring_client.PoolAddrFunc = func(addr string) (ring_client.PoolClient, error) {
return generator_client.New(addr, generatorClientCfg)
}
Expand Down Expand Up @@ -328,6 +340,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
return &tempopb.PushResponse{}, nil
}
// check limits
// todo - usage tracker include discarded bytes?
err = d.checkForRateLimits(size, spanCount, userID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -360,6 +373,11 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
statBytesReceived.Inc(int64(size))
statSpansReceived.Inc(int64(spanCount))

// Usage tracking
if d.usage != nil {
d.usage.Observe(userID, batches)
}

keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
Expand Down Expand Up @@ -498,6 +516,14 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) UsageTrackerHandler() http.Handler {
if d.usage != nil {
return d.usage.Handler()
}

return nil
}

// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring
// and traces to pass onto the ingesters.
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) {
Expand Down
30 changes: 30 additions & 0 deletions modules/distributor/usage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package usage

import (
"flag"
"time"
)

const (
defaultMaxCardinality = uint64(10000)
defaultStaleDuration = 15 * time.Minute
defaultPurgePeriod = time.Minute
)

type PerTrackerConfig struct {
Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"`
MaxCardinality uint64 `yaml:"max_cardinality,omitempty" json:"max_cardinality,omitempty"`
StaleDuration time.Duration `yaml:"stale_duration,omitempty" json:"stale_duration,omitempty"`
}

type Config struct {
CostAttribution PerTrackerConfig `yaml:"cost_attribution,omitempty" json:"cost_attribution,omitempty"`
}

func (c *Config) RegisterFlagsAndApplyDefaults(_ string, _ *flag.FlagSet) {
c.CostAttribution = PerTrackerConfig{
Enabled: false,
MaxCardinality: defaultMaxCardinality,
StaleDuration: defaultStaleDuration,
}
}
Loading

0 comments on commit b0f06ce

Please sign in to comment.