Skip to content

Commit

Permalink
fix: skip exemplars for instant queries (#4204)
Browse files Browse the repository at this point in the history
Currently, we are computing the exemplars for query_range and  instant queries. Since exemplars are not supported for instant queries (same as Prometheus) we can save that computing time.

It also fixes a bug where the exemplars query param was not being honored

Co-authored-by: Martin Disibio <[email protected]>
  • Loading branch information
javiermolinar and mdisibio authored Oct 25, 2024
1 parent b0f06ce commit de5cb00
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 28 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#44236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)

* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
# v2.6.1

* [CHANGE] **BREAKING CHANGE** tempo-query is no longer a Jaeger instance with grpcPlugin. It's now a standalone server. Serving a gRPC API for Jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus)
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ Parameters:
Optional. Can be used instead of `start` and `end` to define the time range in relative values. For example `since=15m` will query the last 15 minutes. Default is last 1 hour.
- `step = (duration string)`
Optional. Defines the granularity of the returned time-series. For example `step=15s` will return a data point every 15s within the time range. If not specified then the default behavior will choose a dynamic step based on the time range.
- `exemplars = (integer)`
Optional. Defines the maximun number of exemplars for the query. It will be trimmed to max_exemplars if exceed it.
The API is available in the query frontend service in
a microservices deployment, or the Tempo endpoint in a monolithic mode deployment.
Expand All @@ -609,7 +611,7 @@ Actual API parameters must be url-encoded. This example is left unencoded for re
{{% /admonition %}}
```
GET /api/metrics/query_range?q={resource.service.name="myservice"}|rate()&since=3h&step=1m
GET /api/metrics/query_range?q={resource.service.name="myservice"} | min_over_time() with(exemplars=true) &since=3h&step=1m&exemplars=100
```
#### Instant
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,9 @@ query_frontend:
# 0 disables this limit.
[max_duration: <duration> | default = 3h ]

# Maximun number of exemplars per range query. Limited to 100.
[max_exemplars: <int> | default = 100 ]

# query_backend_after controls where the query-frontend searches for traces.
# Time ranges older than query_backend_after will be searched in the backend/object storage only.
# Time ranges between query_backend_after and now will be queried from the metrics-generators.
Expand All @@ -694,6 +697,7 @@ query_frontend:
# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]

```

## Querier
Expand Down
4 changes: 2 additions & 2 deletions docs/sources/tempo/traceql/metrics-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Exemplars are a powerful feature of TraceQL metrics.
They allow you to see an exact trace that contributed to a given metric value.
This is particularly useful when you want to understand why a given metric is high or low.

Exemplars are available in TraceQL metrics for all functions.
To get exemplars, you need to configure it in the query-frontend with the parameter `query_frontend.metrics.exemplars`,
Exemplars are available in TraceQL metrics for all range queries.
To get exemplars, you need to configure it in the query-frontend with the parameter `query_frontend.metrics.max_exemplars`,
or pass a query hint in your query.

```
Expand Down
2 changes: 0 additions & 2 deletions integration/e2e/config-query-range.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ query_frontend:
search:
query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend
query_ingesters_until: 0
metrics:
exemplars: true

distributor:
receivers:
Expand Down
1 change: 0 additions & 1 deletion modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
Exemplars: false, // TODO: Remove?
MaxExemplars: 100,
},
SLO: slo,
Expand Down
15 changes: 13 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,18 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, false, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

queryInstantPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, true, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand All @@ -169,7 +180,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
searchTagValues := newTagValuesHTTPHandler(cfg, searchTagValuesPipeline, o, logger)
searchTagValuesV2 := newTagValuesV2HTTPHandler(cfg, searchTagValuesPipeline, o, logger)
metrics := newMetricsSummaryHandler(metricsPipeline, logger)
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryRangePipeline, logger) // Reuses the same pipeline
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryInstantPipeline, logger) // Reuses the same pipeline
queryRange := newMetricsQueryRangeHTTPHandler(cfg, queryRangePipeline, logger)

return &QueryFrontend{
Expand Down
52 changes: 33 additions & 19 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
)

type queryRangeSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
instantMode bool
}

type QueryRangeSharderConfig struct {
Expand All @@ -39,28 +40,32 @@ type QueryRangeSharderConfig struct {
MaxDuration time.Duration `yaml:"max_duration"`
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
Exemplars bool `yaml:"exemplars,omitempty"`
MaxExemplars int `yaml:"max_exemplars,omitempty"`
}

// newAsyncQueryRangeSharder creates a sharding middleware for search
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, instantMode bool, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
return queryRangeSharder{
next: next,
reader: reader,
overrides: o,

cfg: cfg,
logger: logger,
next: next,
reader: reader,
overrides: o,
instantMode: instantMode,
cfg: cfg,
logger: logger,
}
})
}

func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) {
r := pipelineRequest.HTTPRequest()
spanName := "frontend.QueryRangeSharder.range"

if s.instantMode {
spanName = "frontend.QueryRangeSharder.instant"
}

ctx, span := tracer.Start(r.Context(), "frontend.QueryRangeSharder")
ctx, span := tracer.Start(r.Context(), spanName)
defer span.End()

req, err := api.ParseQueryRangeRequest(r)
Expand Down Expand Up @@ -92,6 +97,16 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewBadRequest(err), nil
}

var maxExemplars uint32
// Instant queries must not compute exemplars
if !s.instantMode && s.cfg.MaxExemplars > 0 {
maxExemplars = req.Exemplars
if maxExemplars == 0 || maxExemplars > uint32(s.cfg.MaxExemplars) {
maxExemplars = uint32(s.cfg.MaxExemplars) // Enforce configuration
}
}
req.Exemplars = maxExemplars

var (
allowUnsafe = s.overrides.UnsafeQueryHints(tenantID)
targetBytesPerRequest = s.jobSize(expr, allowUnsafe)
Expand Down Expand Up @@ -150,11 +165,11 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
return metas
}

func (s *queryRangeSharder) exemplarsPerShard(total uint32) uint32 {
if !s.cfg.Exemplars {
func (s *queryRangeSharder) exemplarsPerShard(total uint32, exemplars uint32) uint32 {
if exemplars == 0 {
return 0
}
return uint32(math.Ceil(float64(s.cfg.MaxExemplars)*1.2)) / total
return uint32(math.Ceil(float64(exemplars)*1.2)) / total
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
Expand Down Expand Up @@ -209,7 +224,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
queryHash := hashForQueryRangeRequest(&searchReq)
colsToJSON := api.NewDedicatedColumnsToJSON()

exemplarsPerBlock := s.exemplarsPerShard(uint32(len(metas)))
exemplarsPerBlock := s.exemplarsPerShard(uint32(len(metas)), searchReq.Exemplars)
for _, m := range metas {
if m.EndTime.Before(m.StartTime) {
// Ignore blocks with bad timings from debugging
Expand Down Expand Up @@ -299,7 +314,6 @@ func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID strin
}

searchReq.QueryMode = querier.QueryModeRecent
searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this

subR := parent.HTTPRequest().Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators
Expand Down

0 comments on commit de5cb00

Please sign in to comment.