Skip to content

Commit

Permalink
Reduce impact of backendRequests on latency (#2530)
Browse files Browse the repository at this point in the history
* cleanup

Signed-off-by: Joe Elliott <[email protected]>

* first pass. metrics broken

Signed-off-by: Joe Elliott <[email protected]>

* tests and benches

Signed-off-by: Joe Elliott <[email protected]>

* restore stats

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* tests + uint64 totalBlockBytes

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Jun 2, 2023
1 parent 5807007 commit 17c141f
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ To make use of filtering, configure `autocomplete_filtering_enabled`.
* [ENHANCEMENT] Add span filtering to spanmetrics processor [#2274](https://github.com/grafana/tempo/pull/2274) (@zalegrala)
* [ENHANCEMENT] Add ability to detect virtual nodes in the servicegraph processor [#2365](https://github.com/grafana/tempo/pull/2365) (@mapno)
* [ENHANCEMENT] Introduce `overrides.Interface` to decouple implementation from usage [#2482](https://github.com/grafana/tempo/pull/2482) (@kvrhdn)
* [ENHANCEMENT] Improve TraceQL throughput by asynchronously creating jobs [#2530](https://github.com/grafana/tempo/pull/2530) (@joe-elliott)
* [BUGFIX] tempodb integer divide by zero error [#2167](https://github.com/grafana/tempo/issues/2167) (@kroksys)
* [BUGFIX] metrics-generator: ensure Prometheus will scale up shards when remote write is lagging behind [#2463](https://github.com/grafana/tempo/issues/2463) (@kvrhdn)
* [BUGFIX] Fixes issue where matches and other spanset level attributes were not persisted to the TraceQL results. [#2490](https://github.com/grafana/tempo/pull/2490)
Expand Down
8 changes: 4 additions & 4 deletions modules/frontend/search_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// searchProgressFactory is used to provide a way to construct a shardedSearchProgress to the searchSharder. It exists
// so that streaming search can inject and track it's own special progress object
type searchProgressFactory func(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress
type searchProgressFactory func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress

// shardedSearchProgress is an interface that allows us to get progress
// events from the search sharding handler.
Expand Down Expand Up @@ -51,15 +51,15 @@ type searchProgress struct {
mtx sync.Mutex
}

func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress {
func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress {
return &searchProgress{
ctx: ctx,
statusCode: http.StatusOK,
limit: limit,
finishedRequests: 0,
resultsMetrics: &tempopb.SearchMetrics{
TotalBlocks: uint32(totalBlocks),
TotalBlockBytes: uint64(totalBlockBytes),
TotalBlockBytes: totalBlockBytes,
TotalJobs: uint32(totalJobs),
},
resultsCombiner: traceql.NewMetadataCombiner(),
Expand Down Expand Up @@ -118,7 +118,7 @@ func (r *searchProgress) internalShouldQuit() bool {
if r.statusCode/100 != 2 {
return true
}
if r.resultsCombiner.Count() > r.limit {
if r.resultsCombiner.Count() >= r.limit {
return true
}

Expand Down
3 changes: 0 additions & 3 deletions modules/frontend/search_progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ func TestSearchProgressShouldQuit(t *testing.T) {
{
TraceID: "otherthing",
},
{
TraceID: "thingthatsdifferent",
},
},
Metrics: &tempopb.SearchMetrics{},
})
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/search_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type diffSearchProgress struct {
mtx sync.Mutex
}

func newDiffSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) *diffSearchProgress {
func newDiffSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) *diffSearchProgress {
return &diffSearchProgress{
seenTraces: map[string]struct{}{},
progress: newSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes),
Expand Down Expand Up @@ -119,7 +119,7 @@ func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream htt
}

progress := atomic.NewPointer[*diffSearchProgress](nil)
fn := func(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress {
fn := func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress {
p := newDiffSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes)
progress.Store(&p)
return p
Expand Down
3 changes: 1 addition & 2 deletions modules/frontend/search_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func TestStreamingSearchHandlerStreams(t *testing.T) {
CompletedJobs: r.Metrics.CompletedJobs,
TotalJobs: 2,
TotalBlockBytes: 209715200,
},
},
}},
)
}
},
Expand Down
181 changes: 108 additions & 73 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type SearchSharderConfig struct {
QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"`
}

type backendReqMsg struct {
req *http.Request
err error
}

// newSearchSharder creates a sharding middleware for search
func newSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, sloCfg SLOConfig, progress searchProgressFactory, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
Expand Down Expand Up @@ -130,37 +135,31 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// build request to search ingester based on query_ingesters_until config and time range
// pass subCtx in requests so we can cancel and exit early
ingesterReq, err := s.ingesterRequest(subCtx, tenantID, r, *searchReq)
if err != nil {
return nil, err
}

// pass subCtx in requests so we can cancel and exit early
reqs, blocks, err := s.backendRequests(subCtx, tenantID, r, *searchReq)
ingesterReq, err := s.ingesterRequest(subCtx, tenantID, r, searchReq)
if err != nil {
return nil, err
}
span.SetTag("block-count", len(blocks))

// Add ingester request if we have one. it's important to add the ingester request to
// the beginning of the slice so that it is prioritized over the possibly enormous
// number of backend requests
reqCh := make(chan *backendReqMsg, 1) // buffer of 1 allows us to insert ingestReq if it exists
stopCh := make(chan struct{})
defer close(stopCh)
if ingesterReq != nil {
reqs = append([]*http.Request{ingesterReq}, reqs...)
reqCh <- &backendReqMsg{req: ingesterReq}
}
span.SetTag("request-count", len(reqs))

// pass subCtx in requests so we can cancel and exit early
totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh)

// execute requests
wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))

totalBlockBytes := uint64(0)
for _, b := range blocks {
totalBlockBytes += b.Size
}
progress := s.progress(ctx, int(searchReq.Limit), len(reqs), len(blocks), int(totalBlockBytes))
progress := s.progress(ctx, int(searchReq.Limit), totalJobs, totalBlocks, totalBlockBytes)

startedReqs := 0
for _, req := range reqs {
for req := range reqCh {
if req.err != nil {
return nil, fmt.Errorf("unexpected err building reqs: %w", req.err)
}

// if shouldQuit is true, terminate and abandon requests
if progress.shouldQuit() {
break
Expand Down Expand Up @@ -215,7 +214,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// happy path
progress.addResponse(results)
}(req)
}(req.req)
}

// wait for all goroutines running in wg to finish or cancelled
Expand All @@ -236,7 +235,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
"query", query,
"duration_seconds", reqTime,
"request_throughput", throughput,
"total_requests", len(reqs),
"total_requests", totalJobs,
"started_requests", startedReqs,
"cancelled_requests", cancelledReqs,
"finished_requests", overallResponse.finishedRequests,
Expand Down Expand Up @@ -287,8 +286,8 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
// blockMetas returns all relevant blockMetas given a start/end
func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
// reduce metas to those in the requested range
metas := []*backend.BlockMeta{}
allMetas := s.reader.BlockMetas(tenantID)
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start {
Expand All @@ -299,59 +298,91 @@ func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend
return metas
}

// backendRequest builds backend requests to search backend blocks
func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) ([]*http.Request, []*backend.BlockMeta, error) {
var err error
var reqs []*http.Request
// backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it.
// it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs
func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) (totalJobs, totalBlocks int, totalBlockBytes uint64) {
var blocks []*backend.BlockMeta

// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
return reqs, blocks, nil
close(reqCh)
return
}

// calculate duration (start and end) to search the backend blocks
start, end := backendRange(&searchReq, s.cfg.QueryBackendAfter)
start, end := backendRange(searchReq, s.cfg.QueryBackendAfter)

// no need to search backend
if start == end {
return reqs, blocks, nil
close(reqCh)
return
}

// get block metadata of blocks in start, end duration
blocks = s.blockMetas(int64(start), int64(end), tenantID)

reqs, err = buildBackendRequests(ctx, tenantID, parent, blocks, s.cfg.TargetBytesPerRequest)
return reqs, blocks, err
targetBytesPerRequest := s.cfg.TargetBytesPerRequest

// calculate metrics to return to the caller
totalBlocks = len(blocks)
for _, b := range blocks {
p := pagesPerRequest(b, targetBytesPerRequest)

totalJobs += int(b.TotalRecords) / p
if int(b.TotalRecords)%p != 0 {
totalJobs++
}
totalBlockBytes += b.Size
}

go func() {
buildBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh)
}()

return
}

// backendRange returns a new start/end range for the backend based on the config parameter
// query_backend_after. If the returned start == the returned end then backend querying is not necessary.
func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Duration) (uint32, uint32) {
now := time.Now()
backendAfter := uint32(now.Add(-queryBackendAfter).Unix())

start := searchReq.Start
end := searchReq.End

// adjust start/end if necessary. if the entire query range was inside backendAfter then
// start will == end. This signals we don't need to query the backend.
if end > backendAfter {
end = backendAfter
}
if start > backendAfter {
start = backendAfter
}

return start, end
}

// buildBackendRequests returns a slice of requests that cover all blocks in the store
// that are covered by start/end.
func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int) ([]*http.Request, error) {
reqs := []*http.Request{}
func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) {
defer close(reqCh)

for _, m := range metas {
if m.Size == 0 || m.TotalRecords == 0 {
pages := pagesPerRequest(m, bytesPerRequest)
if pages == 0 {
continue
}

bytesPerPage := m.Size / uint64(m.TotalRecords)
if bytesPerPage == 0 {
return nil, fmt.Errorf("block %s has an invalid 0 bytes per page", m.BlockID)
}
pagesPerQuery := bytesPerRequest / int(bytesPerPage)
if pagesPerQuery == 0 {
pagesPerQuery = 1 // have to have at least 1 page per query
}

blockID := m.BlockID.String()
for startPage := 0; startPage < int(m.TotalRecords); startPage += pagesPerQuery {
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)
subR.Header.Set(user.OrgIDHeaderName, tenantID)

subR, err := api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pagesPerQuery),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
Expand All @@ -362,22 +393,47 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req
})

if err != nil {
return nil, err
reqCh <- &backendReqMsg{err: err}
return
}

subR.RequestURI = buildUpstreamRequestURI(parent.URL.Path, subR.URL.Query())
reqs = append(reqs, subR)

select {
case reqCh <- &backendReqMsg{req: subR}:
case <-stopCh:
return
}
}
}
}

// pagesPerRequest returns an integer value that indicates the number of pages
// that should be searched per query. This value is based on the target number of bytes
// 0 is returned if there is no valid answer
func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int {
if m.Size == 0 || m.TotalRecords == 0 {
return 0
}

return reqs, nil
bytesPerPage := m.Size / uint64(m.TotalRecords)
if bytesPerPage == 0 {
return 0
}

pagesPerQuery := bytesPerRequest / int(bytesPerPage)
if pagesPerQuery == 0 {
pagesPerQuery = 1 // have to have at least 1 page per query
}

return pagesPerQuery
}

// ingesterRequest returns a new start and end time range for the backend as well as an http request
// that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query.
// since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from
// unexpectedly changing the passed searchReq.
func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) (*http.Request, error) {
func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) {
// request without start or end, search only in ingester
if searchReq.Start == 0 || searchReq.End == 0 {
return buildIngesterRequest(ctx, tenantID, parent, searchReq)
Expand Down Expand Up @@ -410,11 +466,11 @@ func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, pa
return buildIngesterRequest(ctx, tenantID, parent, searchReq)
}

func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) (*http.Request, error) {
func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) {
subR := parent.Clone(ctx)

subR.Header.Set(user.OrgIDHeaderName, tenantID)
subR, err := api.BuildSearchRequest(subR, &searchReq)
subR, err := api.BuildSearchRequest(subR, searchReq)
if err != nil {
return nil, err
}
Expand All @@ -423,27 +479,6 @@ func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Req
return subR, nil
}

// backendRange returns a new start/end range for the backend based on the config parameter
// query_backend_after. If the returned start == the returned end then backend querying is not necessary.
func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Duration) (uint32, uint32) {
now := time.Now()
backendAfter := uint32(now.Add(-queryBackendAfter).Unix())

start := searchReq.Start
end := searchReq.End

// adjust start/end if necessary. if the entire query range was inside backendAfter then
// start will == end. This signals we don't need to query the backend.
if end > backendAfter {
end = backendAfter
}
if start > backendAfter {
start = backendAfter
}

return start, end
}

// adjusts the limit based on provided config
func adjustLimit(limit, defaultLimit, maxLimit uint32) uint32 {
if limit == 0 {
Expand Down
Loading

0 comments on commit 17c141f

Please sign in to comment.