Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip exemplars for instant queries #4204

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
* [CHANGE] **BREAKING CHANGE** Migrate from OpenTracing to OpenTelemetry instrumentation. Removed the `use_otel_tracer` configuration option. Use the OpenTelemetry environment variables to configure the span exporter [#3646](https://github.com/grafana/tempo/pull/3646) (@andreasgerstmayr)
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ Parameters:
Optional. Can be used instead of `start` and `end` to define the time range in relative values. For example `since=15m` will query the last 15 minutes. Default is last 1 hour.
- `step = (duration string)`
Optional. Defines the granularity of the returned time-series. For example `step=15s` will return a data point every 15s within the time range. If not specified then the default behavior will choose a dynamic step based on the time range.
- `exemplars = (integer)`
Optional. Defines the maximun number of exemplars for the query. It will be trimmed to max_exemplars if exceed it.

The API is available in the query frontend service in
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment.
Expand All @@ -608,7 +610,7 @@ Actual API parameters must be url-encoded. This example is left unencoded for re
{{% /admonition %}}

```
GET /api/metrics/query_range?q={resource.service.name="myservice"}|rate()&since=3h&step=1m
GET /api/metrics/query_range?q={resource.service.name="myservice"} | min_over_time() with(exemplars=true) &since=3h&step=1m&exemplars=100
```

#### Instant
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,9 @@ query_frontend:
# 0 disables this limit.
[max_duration: <duration> | default = 3h ]

# Maximun number of exemplars per range query. Limited to 100.
[max_exemplars: <int> | default = 100 ]

# query_backend_after controls where the query-frontend searches for traces.
# Time ranges older than query_backend_after will be searched in the backend/object storage only.
# Time ranges between query_backend_after and now will be queried from the metrics-generators.
Expand All @@ -677,6 +680,7 @@ query_frontend:
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]

```

## Querier
Expand Down
4 changes: 2 additions & 2 deletions docs/sources/tempo/traceql/metrics-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Exemplars are a powerful feature of TraceQL metrics.
They allow you to see an exact trace that contributed to a given metric value.
This is particularly useful when you want to understand why a given metric is high or low.

Exemplars are available in TraceQL metrics for all functions.
To get exemplars, you need to configure it in the query-frontend with the parameter `query_frontend.metrics.exemplars`,
Exemplars are available in TraceQL metrics for all range queries.
To get exemplars, you need to configure it in the query-frontend with the parameter `query_frontend.metrics.max_exemplars`,
or pass a query hint in your query.

```
Expand Down
1 change: 0 additions & 1 deletion modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
Exemplars: false, // TODO: Remove?
MaxExemplars: 100,
},
SLO: slo,
Expand Down
15 changes: 13 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,18 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, false, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

queryInstantPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, true, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand All @@ -169,7 +180,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
searchTagValues := newTagValuesHTTPHandler(cfg, searchTagValuesPipeline, o, logger)
searchTagValuesV2 := newTagValuesV2HTTPHandler(cfg, searchTagValuesPipeline, o, logger)
metrics := newMetricsSummaryHandler(metricsPipeline, logger)
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryRangePipeline, logger) // Reuses the same pipeline
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryInstantPipeline, logger) // Reuses the same pipeline
queryRange := newMetricsQueryRangeHTTPHandler(cfg, queryRangePipeline, logger)

return &QueryFrontend{
Expand Down
52 changes: 33 additions & 19 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
)

type queryRangeSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
instantMode bool
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
}

type QueryRangeSharderConfig struct {
Expand All @@ -39,28 +40,32 @@ type QueryRangeSharderConfig struct {
MaxDuration time.Duration `yaml:"max_duration"`
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
Exemplars bool `yaml:"exemplars,omitempty"`
MaxExemplars int `yaml:"max_exemplars,omitempty"`
}

// newAsyncQueryRangeSharder creates a sharding middleware for search
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, instantMode bool, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
return queryRangeSharder{
next: next,
reader: reader,
overrides: o,

cfg: cfg,
logger: logger,
next: next,
reader: reader,
overrides: o,
instantMode: instantMode,
cfg: cfg,
logger: logger,
}
})
}

func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) {
r := pipelineRequest.HTTPRequest()
spanName := "frontend.QueryRangeSharder.range"

if s.instantMode {
spanName = "frontend.QueryRangeSharder.instant"
}

ctx, span := tracer.Start(r.Context(), "frontend.QueryRangeSharder")
ctx, span := tracer.Start(r.Context(), spanName)
defer span.End()

req, err := api.ParseQueryRangeRequest(r)
Expand Down Expand Up @@ -92,6 +97,16 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewBadRequest(err), nil
}

var maxExemplars uint32
// Instant queries must not compute exemplars
if !s.instantMode && s.cfg.MaxExemplars > 0 {
maxExemplars = req.Exemplars
if maxExemplars == 0 || maxExemplars > uint32(s.cfg.MaxExemplars) {
maxExemplars = uint32(s.cfg.MaxExemplars) // Enforce configuration
}
}
req.Exemplars = maxExemplars

var (
allowUnsafe = s.overrides.UnsafeQueryHints(tenantID)
targetBytesPerRequest = s.jobSize(expr, allowUnsafe)
Expand Down Expand Up @@ -150,11 +165,11 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
return metas
}

func (s *queryRangeSharder) exemplarsPerShard(total uint32) uint32 {
if !s.cfg.Exemplars {
func (s *queryRangeSharder) exemplarsPerShard(total uint32, exemplars uint32) uint32 {
if exemplars == 0 {
return 0
}
return uint32(math.Ceil(float64(s.cfg.MaxExemplars)*1.2)) / total
return uint32(math.Ceil(float64(exemplars)*1.2)) / total
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
Expand Down Expand Up @@ -209,7 +224,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
queryHash := hashForQueryRangeRequest(&searchReq)
colsToJSON := api.NewDedicatedColumnsToJSON()

exemplarsPerBlock := s.exemplarsPerShard(uint32(len(metas)))
exemplarsPerBlock := s.exemplarsPerShard(uint32(len(metas)), searchReq.Exemplars)
for _, m := range metas {
if m.EndTime.Before(m.StartTime) {
// Ignore blocks with bad timings from debugging
Expand Down Expand Up @@ -299,7 +314,6 @@ func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID strin
}

searchReq.QueryMode = querier.QueryModeRecent
searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this

subR := parent.HTTPRequest().Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators
Expand Down
Loading