Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frontend batching #2677

Merged
merged 22 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [FEATURE] New experimental API to derive on-demand RED metrics grouped by any attribute, and new metrics generator processor [#2368](https://github.com/grafana/tempo/pull/2368) [#2418](https://github.com/grafana/tempo/pull/2418) [#2424](https://github.com/grafana/tempo/pull/2424) [#2442](https://github.com/grafana/tempo/pull/2442) [#2480](https://github.com/grafana/tempo/pull/2480) [#2481](https://github.com/grafana/tempo/pull/2481) [#2501](https://github.com/grafana/tempo/pull/2501) [#2579](https://github.com/grafana/tempo/pull/2579) [#2582](https://github.com/grafana/tempo/pull/2582) (@mdisibio @zalegrala)
* [FEATURE] New TraceQL structural operators descendant (>>), child (>), and sibling (~) [#2625](https://github.com/grafana/tempo/pull/2625) [#2660](https://github.com/grafana/tempo/pull/2660) (@mdisibio)
* [FEATURE] Add user-configurable overrides module [#2543](https://github.com/grafana/tempo/pull/2543) (@electron0zero @kvrhdn)
* [ENHANCEMENT] Add support for query batching between frontend and queriers to improve throughput [#2677](https://github.com/grafana/tempo/pull/2677) (@joe-elliott)
* [ENHANCEMENT] Add initial RBAC support for serverless backend queries, limited to Google CloudRun [#2487](https://github.com/grafana/tempo/pull/2593) (@modulitos)
* [ENHANCEMENT] Add capability to flush all remaining traces to backend when ingester is stopped [#2538](https://github.com/grafana/tempo/pull/2538)
* [ENHANCEMENT] Fill parent ID column and nested set columns [#2487](https://github.com/grafana/tempo/pull/2487) (@stoewer)
Expand Down
12 changes: 9 additions & 3 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,16 @@ query_frontend:
# (default: 2)
[max_retries: <int>]

# Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.
# (default: 2000)
[max_outstanding_per_tenant: <int>]

# The number of jobs to batch together in one http request to the querier. Set to 1 to
# disable.
# (default: 5)
[max_batch_size: <int>]

search:
# Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.
# (default: 2000)
[max_outstanding_per_tenant: <int>]

# The number of concurrent jobs to execute when searching the backend.
# (default: 1000)
Expand Down
41 changes: 30 additions & 11 deletions docs/sources/tempo/operations/backend_search.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,44 @@ while additional compactors will more aggressively reduce the length of your blo
>**Note:** All forms of search (TraceQL and tags based) are only supported on the `vParquet` and forward blocks. [v2 blocks]({{< relref "../configuration/parquet#choose-a-different-block-format" >}})
can only be used for trace by id lookup.

## General guidelines

Tuning the search pipeline can be difficult as it requires balancing a number of different configuration parameters. The below tips
can you get your head around the general problem, but the specifics require experimentation.

- Review the query-frontend logs for lines like the following to get a feeling for how many jobs your queries are creating:
```
level=info ts=2023-07-19T19:38:01.354220385Z caller=searchsharding.go:236 msg="sharded search query request stats and SearchMetrics" ...
```

- For a single TraceQL query the maximum number of parallel jobs is constrained by:
- `query_frontend.search.concurrent_jobs`: This is the maximum number of jobs the frontend will dispatch for one TraceQL query.
- `# queriers * querier.max_concurrent_queries * query_frontend.max_batch_size`: This is the maximum job capacity of your Tempo cluster.
If a given TraceQL query produces less jobs then these two values it should be executed entirely in parallel on the queriers.

- Increasing `querier.max_concurrent_queries` is a great way to get more out of your queriers. However, if queriers are OOMing or saturating other
resources then this should be lowered. Lowering `query_frontend.max_batch_size` will also reduce the total work attempted by one querier.

## Configuration

Queriers and query frontends have additional configuration related
to search of the backend datastore.

### Querier

Without serverless technologies:

```
querier:
# Greatly increase the amount of work each querier will attempt
# Control the amount of work each querier will attempt. The total number of
# jobs a querier will attempt this is this value * query_frontend.max_batch_size
max_concurrent_queries: 20
```

With serverless technologies:

>**Note:** Serverless can be a nice way to reduce cost by using it as spare query capacity. However, serverless tends to have higher variance then simply allowing the queriers to perform the searches themselves.

```
querier:
# The querier is only a proxy to the serverless endpoint.
# Increase this greatly to permit needed throughput.
max_concurrent_queries: 100

search:
# A list of endpoints to query. Load will be spread evenly across
Expand All @@ -55,6 +71,7 @@ querier:
external_hedge_requests_up_to: 2
```


### Query frontend

[Query frontend]({{< relref "../configuration#query-frontend" >}}) lists all configuration
Expand All @@ -71,9 +88,14 @@ server:

query_frontend:
# When increasing concurrent_jobs, also increase the queue size per tenant,
# or search requests will be cause 429 errors.
# or search requests will be cause 429 errors. This is the total number of jobs
# per tenant allowed in the queue.
max_outstanding_per_tenant: 2000

# The number of jobs the query-frontend will batch together when passing jobs to the queriers. This value
# This value * querier.max_concurrent_queries is your the max number of jobs a given querier will try at once.
max_batch_size: 5

search:
# At larger scales, increase the number of jobs attempted simultaneously,
# per search query.
Expand All @@ -87,10 +109,7 @@ query_frontend:

## Serverless environment

Serverless is not required, but with larger loads, serverless is recommended to reduce costs and
improve performance. If you find that you are scaling up your quantity of queriers, yet are not
achieving the latencies you would like, switch to serverless.

Serverless is not required, but with larger loads, serverless can be used to reduce costs.
Tempo has support for Google Cloud Run and AWS Lambda. In both cases, you will use the following
settings to configure Tempo to use a serverless environment:

Expand Down
6 changes: 4 additions & 2 deletions example/docker-compose/distributed/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ services:

query-frontend:
image: grafana/tempo:latest
command: "-target=query-frontend -config.file=/etc/tempo.yaml"
command: "-target=query-frontend -config.file=/etc/tempo.yaml -log.level=debug"
restart: always
volumes:
- ./tempo-distributed.yaml:/etc/tempo.yaml
Expand All @@ -48,7 +48,7 @@ services:

querier:
image: grafana/tempo:latest
command: "-target=querier -config.file=/etc/tempo.yaml"
command: "-target=querier -config.file=/etc/tempo.yaml -log.level=debug"
restart: always
volumes:
- ./tempo-distributed.yaml:/etc/tempo.yaml
Expand Down Expand Up @@ -92,6 +92,8 @@ services:
restart: always
depends_on:
- distributor
logging:
driver: "none"

prometheus:
image: prom/prometheus:latest
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
}

cfg.Config.MaxOutstandingPerTenant = 2000
cfg.Config.MaxBatchSize = 5
cfg.MaxRetries = 2
cfg.Search = SearchConfig{
Sharder: SearchSharderConfig{
Expand Down
86 changes: 64 additions & 22 deletions modules/frontend/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type RequestQueue struct {

connectedQuerierWorkers *atomic.Int32

mtx sync.Mutex
mtx sync.RWMutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
stopped bool
Expand Down Expand Up @@ -79,39 +79,72 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error {
q.mtx.Lock()
defer q.mtx.Unlock()
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int) error {
q.mtx.RLock()
// don't defer a release. we won't know what we need to release until we call getQueueUnderRlock

if q.stopped {
q.mtx.RUnlock()
return ErrStopped
}

queue := q.queues.getOrAddQueue(userID, maxQueriers)
if queue == nil {
// This can only happen if userID is "".
return errors.New("no queue found")
// try to grab the user queue under read lock
queue, cleanup, err := q.getQueueUnderRlock(userID, maxQueriers)
defer cleanup()
if err != nil {
return err
}

select {
case queue <- req:
q.queueLength.WithLabelValues(userID).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
if successFn != nil {
successFn()
}
return nil
default:
q.discardedRequests.WithLabelValues(userID).Inc()
return ErrTooManyRequests
}
}

// GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests.
// By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly.
// If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error) {
// getQueueUnderRlock attempts to get the queue for the given user under read lock. if it is not
// possible it upgrades the RLock to a Lock. This method also returns a cleanup function that
// will release whichever lock it had to acquire to get the queue.
func (q *RequestQueue) getQueueUnderRlock(userID string, maxQueriers int) (chan Request, func(), error) {
cleanup := func() {
q.mtx.RUnlock()
}

uq := q.queues.userQueues[userID]
if uq != nil {
return uq.ch, cleanup, nil
}

// trade the read lock for a rw lock and then defer the opposite
// this method should be called under RLock() and return under RLock()
q.mtx.RUnlock()
q.mtx.Lock()

cleanup = func() {
q.mtx.Unlock()
}

queue := q.queues.getOrAddQueue(userID, maxQueriers)
if queue == nil {
// This can only happen if userID is "".
return nil, cleanup, errors.New("no queue found")
}

return queue, cleanup, nil
}

// GetNextRequestForQuerier find next user queue and attempts to dequeue N requests as defined by the length of
// batchBuffer. This slice is a reusable buffer to fill up with requests
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string, batchBuffer []Request) ([]Request, UserIndex, error) {
requestedCount := len(batchBuffer)
if requestedCount == 0 {
return nil, last, errors.New("batch buffer must have len > 0")
}

q.mtx.Lock()
defer q.mtx.Unlock()

Expand All @@ -135,18 +168,27 @@ FindQueue:
queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID)
last.last = idx
if queue != nil {
// Pick next request from the queue.
request := <-queue
if len(queue) == 0 {
q.queues.deleteQueue(userID)
// this is all threadsafe b/c all users queues are blocked by q.mtx
if len(queue) < requestedCount {
requestedCount = len(queue)
}

q.queueLength.WithLabelValues(userID).Dec()
// Pick next requests from the queue.
batchBuffer = batchBuffer[:requestedCount]
for i := 0; i < requestedCount; i++ {
batchBuffer[i] = <-queue
}

qLen := len(queue)
if qLen == 0 {
q.queues.deleteQueue(userID)
}
q.queueLength.WithLabelValues(userID).Set(float64(qLen))

// Tell close() we've processed a request.
q.cond.Broadcast()

return request, last, nil
return batchBuffer, last, nil
}

// There are no unexpired requests, so we can get back
Expand Down
Loading