Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce impact of backendRequests on latency #2530

Merged
merged 7 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ To make use of filtering, configure `autocomplete_filtering_enabled`.
* [ENHANCEMENT] Add span filtering to spanmetrics processor [#2274](https://github.com/grafana/tempo/pull/2274) (@zalegrala)
* [ENHANCEMENT] Add ability to detect virtual nodes in the servicegraph processor [#2365](https://github.com/grafana/tempo/pull/2365) (@mapno)
* [ENHANCEMENT] Introduce `overrides.Interface` to decouple implementation from usage [#2482](https://github.com/grafana/tempo/pull/2482) (@kvrhdn)
* [ENHANCEMENT] Improve TraceQL throughput by asynchronously creating jobs [#2530](https://github.com/grafana/tempo/pull/2530) (@joe-elliott)
* [BUGFIX] tempodb integer divide by zero error [#2167](https://github.com/grafana/tempo/issues/2167) (@kroksys)
* [BUGFIX] metrics-generator: ensure Prometheus will scale up shards when remote write is lagging behind [#2463](https://github.com/grafana/tempo/issues/2463) (@kvrhdn)
* [BUGFIX] Fixes issue where matches and other spanset level attributes were not persisted to the TraceQL results. [#2490](https://github.com/grafana/tempo/pull/2490)
Expand Down
8 changes: 4 additions & 4 deletions modules/frontend/search_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// searchProgressFactory is used to provide a way to construct a shardedSearchProgress to the searchSharder. It exists
// so that streaming search can inject and track it's own special progress object
type searchProgressFactory func(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress
type searchProgressFactory func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress

// shardedSearchProgress is an interface that allows us to get progress
// events from the search sharding handler.
Expand Down Expand Up @@ -50,15 +50,15 @@ type searchProgress struct {
mtx sync.Mutex
}

func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress {
func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress {
return &searchProgress{
ctx: ctx,
statusCode: http.StatusOK,
limit: limit,
finishedRequests: 0,
resultsMetrics: &tempopb.SearchMetrics{
TotalBlocks: uint32(totalBlocks),
TotalBlockBytes: uint64(totalBlockBytes),
TotalBlockBytes: totalBlockBytes,
TotalJobs: uint32(totalJobs),
},
resultsCombiner: traceql.NewMetadataCombiner(),
Expand Down Expand Up @@ -117,7 +117,7 @@ func (r *searchProgress) internalShouldQuit() bool {
if r.statusCode/100 != 2 {
return true
}
if r.resultsCombiner.Count() > r.limit {
if r.resultsCombiner.Count() >= r.limit {
return true
}

Expand Down
3 changes: 0 additions & 3 deletions modules/frontend/search_progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ func TestSearchProgressShouldQuit(t *testing.T) {
{
TraceID: "otherthing",
},
{
TraceID: "thingthatsdifferent",
},
},
Metrics: &tempopb.SearchMetrics{},
})
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/search_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type diffSearchProgress struct {
mtx sync.Mutex
}

func newDiffSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) *diffSearchProgress {
func newDiffSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) *diffSearchProgress {
return &diffSearchProgress{
seenTraces: map[string]struct{}{},
progress: newSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes),
Expand Down Expand Up @@ -119,7 +119,7 @@ func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream htt
}

progress := atomic.NewPointer[*diffSearchProgress](nil)
fn := func(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress {
fn := func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress {
p := newDiffSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes)
progress.Store(&p)
return p
Expand Down
3 changes: 1 addition & 2 deletions modules/frontend/search_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func TestStreamingSearchHandlerStreams(t *testing.T) {
CompletedJobs: r.Metrics.CompletedJobs,
TotalJobs: 2,
TotalBlockBytes: 209715200,
},
},
}},
)
}
},
Expand Down
181 changes: 108 additions & 73 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type SearchSharderConfig struct {
QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"`
}

type backendReqMsg struct {
req *http.Request
err error
}

// newSearchSharder creates a sharding middleware for search
func newSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, sloCfg SLOConfig, progress searchProgressFactory, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
Expand Down Expand Up @@ -130,37 +135,31 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// build request to search ingester based on query_ingesters_until config and time range
// pass subCtx in requests so we can cancel and exit early
ingesterReq, err := s.ingesterRequest(subCtx, tenantID, r, *searchReq)
if err != nil {
return nil, err
}

// pass subCtx in requests so we can cancel and exit early
reqs, blocks, err := s.backendRequests(subCtx, tenantID, r, *searchReq)
ingesterReq, err := s.ingesterRequest(subCtx, tenantID, r, searchReq)
if err != nil {
return nil, err
}
span.SetTag("block-count", len(blocks))

// Add ingester request if we have one. it's important to add the ingester request to
// the beginning of the slice so that it is prioritized over the possibly enormous
// number of backend requests
reqCh := make(chan *backendReqMsg, 1) // buffer of 1 allows us to insert ingestReq if it exists
stopCh := make(chan struct{})
defer close(stopCh)
if ingesterReq != nil {
reqs = append([]*http.Request{ingesterReq}, reqs...)
reqCh <- &backendReqMsg{req: ingesterReq}
}
span.SetTag("request-count", len(reqs))

// pass subCtx in requests so we can cancel and exit early
totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh)

// execute requests
wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))

totalBlockBytes := uint64(0)
for _, b := range blocks {
totalBlockBytes += b.Size
}
progress := s.progress(ctx, int(searchReq.Limit), len(reqs), len(blocks), int(totalBlockBytes))
progress := s.progress(ctx, int(searchReq.Limit), totalJobs, totalBlocks, totalBlockBytes)

startedReqs := 0
for _, req := range reqs {
for req := range reqCh {
if req.err != nil {
return nil, fmt.Errorf("unexpected err building reqs: %w", req.err)
}

// if shouldQuit is true, terminate and abandon requests
if progress.shouldQuit() {
break
Expand Down Expand Up @@ -215,7 +214,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// happy path
progress.addResponse(results)
}(req)
}(req.req)
}

// wait for all goroutines running in wg to finish or cancelled
Expand All @@ -236,7 +235,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
"query", query,
"duration_seconds", reqTime,
"request_throughput", throughput,
"total_requests", len(reqs),
"total_requests", totalJobs,
"started_requests", startedReqs,
"cancelled_requests", cancelledReqs,
"finished_requests", overallResponse.finishedRequests,
Expand Down Expand Up @@ -287,8 +286,8 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
// blockMetas returns all relevant blockMetas given a start/end
func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
// reduce metas to those in the requested range
metas := []*backend.BlockMeta{}
allMetas := s.reader.BlockMetas(tenantID)
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start {
Expand All @@ -299,59 +298,91 @@ func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend
return metas
}

// backendRequest builds backend requests to search backend blocks
func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) ([]*http.Request, []*backend.BlockMeta, error) {
var err error
var reqs []*http.Request
// backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it.
// it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs
func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) (totalJobs, totalBlocks int, totalBlockBytes uint64) {
var blocks []*backend.BlockMeta

// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
return reqs, blocks, nil
close(reqCh)
return
}

// calculate duration (start and end) to search the backend blocks
start, end := backendRange(&searchReq, s.cfg.QueryBackendAfter)
start, end := backendRange(searchReq, s.cfg.QueryBackendAfter)

// no need to search backend
if start == end {
return reqs, blocks, nil
close(reqCh)
return
}

// get block metadata of blocks in start, end duration
blocks = s.blockMetas(int64(start), int64(end), tenantID)

reqs, err = buildBackendRequests(ctx, tenantID, parent, blocks, s.cfg.TargetBytesPerRequest)
return reqs, blocks, err
targetBytesPerRequest := s.cfg.TargetBytesPerRequest

// calculate metrics to return to the caller
totalBlocks = len(blocks)
for _, b := range blocks {
p := pagesPerRequest(b, targetBytesPerRequest)

totalJobs += int(b.TotalRecords) / p
if int(b.TotalRecords)%p != 0 {
totalJobs++
}
totalBlockBytes += b.Size
}

go func() {
buildBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh)
}()

return
}

// backendRange returns a new start/end range for the backend based on the config parameter
// query_backend_after. If the returned start == the returned end then backend querying is not necessary.
func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Duration) (uint32, uint32) {
now := time.Now()
backendAfter := uint32(now.Add(-queryBackendAfter).Unix())

start := searchReq.Start
end := searchReq.End

// adjust start/end if necessary. if the entire query range was inside backendAfter then
// start will == end. This signals we don't need to query the backend.
if end > backendAfter {
end = backendAfter
}
if start > backendAfter {
start = backendAfter
}

return start, end
}

// buildBackendRequests returns a slice of requests that cover all blocks in the store
// that are covered by start/end.
func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int) ([]*http.Request, error) {
reqs := []*http.Request{}
func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) {
defer close(reqCh)

for _, m := range metas {
if m.Size == 0 || m.TotalRecords == 0 {
pages := pagesPerRequest(m, bytesPerRequest)
if pages == 0 {
continue
}

bytesPerPage := m.Size / uint64(m.TotalRecords)
if bytesPerPage == 0 {
return nil, fmt.Errorf("block %s has an invalid 0 bytes per page", m.BlockID)
}
pagesPerQuery := bytesPerRequest / int(bytesPerPage)
if pagesPerQuery == 0 {
pagesPerQuery = 1 // have to have at least 1 page per query
}

blockID := m.BlockID.String()
for startPage := 0; startPage < int(m.TotalRecords); startPage += pagesPerQuery {
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)
subR.Header.Set(user.OrgIDHeaderName, tenantID)

subR, err := api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pagesPerQuery),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
Expand All @@ -362,22 +393,47 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req
})

if err != nil {
return nil, err
reqCh <- &backendReqMsg{err: err}
return
}

subR.RequestURI = buildUpstreamRequestURI(parent.URL.Path, subR.URL.Query())
reqs = append(reqs, subR)

select {
case reqCh <- &backendReqMsg{req: subR}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SearchSharderRoundTrip50000-8     318ms ± 4%     472ms ± 2%  +48.65% 

One thought on this is we could reduce the channel overhead by sending batched requests instead of individually. Looking at the code the easiest split is probably all jobs for a block in one channel send here.

Copy link
Member Author

@joe-elliott joe-elliott Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive attempt was worse. before = this PR, after = this PR with batching as suggested:

name                           old time/op    new time/op    delta
SearchSharderRoundTrip5-8         659µs ± 1%    1108µs ±83%   +68.15%  (p=0.016 n=4+5)
SearchSharderRoundTrip500-8      12.2ms ± 4%    12.9ms ± 6%      ~     (p=0.056 n=5+5)
SearchSharderRoundTrip50000-8     474ms ±11%     540ms ± 8%   +13.91%  (p=0.032 n=5+5)

name                           old alloc/op   new alloc/op   delta
SearchSharderRoundTrip5-8         451kB ± 0%     588kB ±54%   +30.19%  (p=0.008 n=5+5)
SearchSharderRoundTrip500-8      4.80MB ± 0%    4.96MB ± 0%    +3.43%  (p=0.008 n=5+5)
SearchSharderRoundTrip50000-8     176MB ± 0%     181MB ± 0%    +3.03%  (p=0.016 n=5+4)

name                           old allocs/op  new allocs/op  delta
SearchSharderRoundTrip5-8         1.33k ± 2%    2.98k ±133%  +123.52%  (p=0.008 n=5+5)
SearchSharderRoundTrip500-8       57.2k ± 0%     58.0k ± 0%    +1.24%  (p=0.008 n=5+5)
SearchSharderRoundTrip50000-8     2.26M ± 0%     2.28M ± 0%    +0.88%  (p=0.008 n=5+5)

I think the additional memory management offsets it. Personally, i'm not concerned about that +40%. Even in that case the overall performance is going to be significantly better b/c we're getting jobs to queriers faster.

That first benchmark SearchSharderRoundTrip5 is the most interesting b/c it roughly represents "time to first job" which is the real improvement here.

case <-stopCh:
return
}
}
}
}

// pagesPerRequest returns an integer value that indicates the number of pages
// that should be searched per query. This value is based on the target number of bytes
// 0 is returned if there is no valid answer
func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int {
if m.Size == 0 || m.TotalRecords == 0 {
return 0
}

return reqs, nil
bytesPerPage := m.Size / uint64(m.TotalRecords)
if bytesPerPage == 0 {
return 0
}

pagesPerQuery := bytesPerRequest / int(bytesPerPage)
if pagesPerQuery == 0 {
pagesPerQuery = 1 // have to have at least 1 page per query
}

return pagesPerQuery
}

// ingesterRequest returns a new start and end time range for the backend as well as an http request
// that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query.
// since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from
// unexpectedly changing the passed searchReq.
func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) (*http.Request, error) {
func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) {
// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
return buildIngesterRequest(ctx, tenantID, parent, searchReq)
Expand Down Expand Up @@ -410,11 +466,11 @@ func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, pa
return buildIngesterRequest(ctx, tenantID, parent, searchReq)
}

func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) (*http.Request, error) {
func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) {
subR := parent.Clone(ctx)

subR.Header.Set(user.OrgIDHeaderName, tenantID)
subR, err := api.BuildSearchRequest(subR, &searchReq)
subR, err := api.BuildSearchRequest(subR, searchReq)
if err != nil {
return nil, err
}
Expand All @@ -423,27 +479,6 @@ func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Req
return subR, nil
}

// backendRange returns a new start/end range for the backend based on the config parameter
// query_backend_after. If the returned start == the returned end then backend querying is not necessary.
func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Duration) (uint32, uint32) {
now := time.Now()
backendAfter := uint32(now.Add(-queryBackendAfter).Unix())

start := searchReq.Start
end := searchReq.End

// adjust start/end if necessary. if the entire query range was inside backendAfter then
// start will == end. This signals we don't need to query the backend.
if end > backendAfter {
end = backendAfter
}
if start > backendAfter {
start = backendAfter
}

return start, end
}

// adjusts the limit based on provided config
func adjustLimit(limit, defaultLimit, maxLimit uint32) uint32 {
if limit == 0 {
Expand Down
Loading