From 1e468d2f29400588645b448cbd78adce99b05d3d Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 31 May 2023 12:49:55 -0400 Subject: [PATCH 1/6] cleanup Signed-off-by: Joe Elliott --- modules/frontend/searchsharding.go | 56 ++++++++++++------------- modules/frontend/searchsharding_test.go | 4 +- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/modules/frontend/searchsharding.go b/modules/frontend/searchsharding.go index f5173fd2284..c40e782e429 100644 --- a/modules/frontend/searchsharding.go +++ b/modules/frontend/searchsharding.go @@ -130,13 +130,13 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { // build request to search ingester based on query_ingesters_until config and time range // pass subCtx in requests so we can cancel and exit early - ingesterReq, err := s.ingesterRequest(subCtx, tenantID, r, *searchReq) + ingesterReq, err := s.ingesterRequest(subCtx, tenantID, r, searchReq) if err != nil { return nil, err } // pass subCtx in requests so we can cancel and exit early - reqs, blocks, err := s.backendRequests(subCtx, tenantID, r, *searchReq) + reqs, blocks, err := s.backendRequests(subCtx, tenantID, r, searchReq) if err != nil { return nil, err } @@ -300,7 +300,7 @@ func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend } // backendRequest builds backend requests to search backend blocks -func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) ([]*http.Request, []*backend.BlockMeta, error) { +func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) ([]*http.Request, []*backend.BlockMeta, error) { var err error var reqs []*http.Request var blocks []*backend.BlockMeta @@ -311,7 +311,7 @@ func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, pa } // calculate duration (start and end) to search the backend blocks - start, end := backendRange(&searchReq, s.cfg.QueryBackendAfter) + start, end := backendRange(searchReq, s.cfg.QueryBackendAfter) // no need to search backend if start == end { @@ -325,6 +325,27 @@ func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, pa return reqs, blocks, err } +// backendRange returns a new start/end range for the backend based on the config parameter +// query_backend_after. If the returned start == the returned end then backend querying is not necessary. +func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Duration) (uint32, uint32) { + now := time.Now() + backendAfter := uint32(now.Add(-queryBackendAfter).Unix()) + + start := searchReq.Start + end := searchReq.End + + // adjust start/end if necessary. if the entire query range was inside backendAfter then + // start will == end. This signals we don't need to query the backend. + if end > backendAfter { + end = backendAfter + } + if start > backendAfter { + start = backendAfter + } + + return start, end +} + // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int) ([]*http.Request, error) { @@ -377,7 +398,7 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from // unexpectedly changing the passed searchReq. -func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) (*http.Request, error) { +func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) { // request without start or end, search only in ingester if searchReq.Start == 0 || searchReq.End == 0 { return buildIngesterRequest(ctx, tenantID, parent, searchReq) @@ -410,11 +431,11 @@ func (s *searchSharder) ingesterRequest(ctx context.Context, tenantID string, pa return buildIngesterRequest(ctx, tenantID, parent, searchReq) } -func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest) (*http.Request, error) { +func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) (*http.Request, error) { subR := parent.Clone(ctx) subR.Header.Set(user.OrgIDHeaderName, tenantID) - subR, err := api.BuildSearchRequest(subR, &searchReq) + subR, err := api.BuildSearchRequest(subR, searchReq) if err != nil { return nil, err } @@ -423,27 +444,6 @@ func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Req return subR, nil } -// backendRange returns a new start/end range for the backend based on the config parameter -// query_backend_after. If the returned start == the returned end then backend querying is not necessary. -func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Duration) (uint32, uint32) { - now := time.Now() - backendAfter := uint32(now.Add(-queryBackendAfter).Unix()) - - start := searchReq.Start - end := searchReq.End - - // adjust start/end if necessary. if the entire query range was inside backendAfter then - // start will == end. This signals we don't need to query the backend. - if end > backendAfter { - end = backendAfter - } - if start > backendAfter { - start = backendAfter - } - - return start, end -} - // adjusts the limit based on provided config func adjustLimit(limit, defaultLimit, maxLimit uint32) uint32 { if limit == 0 { diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index 4da82991ab9..39581a86114 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -271,7 +271,7 @@ func TestBackendRequests(t *testing.T) { searchReq, err := api.ParseSearchRequest(r) require.NoError(t, err) - reqs, blocks, err := s.backendRequests(context.TODO(), "test", r, *searchReq) + reqs, blocks, err := s.backendRequests(context.TODO(), "test", r, searchReq) assert.Equal(t, tc.expectedError, err) reqURIs := make([]string, 0) @@ -365,7 +365,7 @@ func TestIngesterRequest(t *testing.T) { searchReq, err := api.ParseSearchRequest(req) require.NoError(t, err) - actualReq, err := s.ingesterRequest(context.Background(), "test", req, *searchReq) + actualReq, err := s.ingesterRequest(context.Background(), "test", req, searchReq) if tc.expectedError != nil { assert.Equal(t, tc.expectedError, err) continue From a7a856faaf5bd67c0be101c0d05abfbd988232d6 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 31 May 2023 13:36:57 -0400 Subject: [PATCH 2/6] first pass. metrics broken Signed-off-by: Joe Elliott --- modules/frontend/search_streaming_test.go | 17 ++-- modules/frontend/searchsharding.go | 113 +++++++++++++--------- modules/frontend/searchsharding_test.go | 62 ++++++++---- 3 files changed, 118 insertions(+), 74 deletions(-) diff --git a/modules/frontend/search_streaming_test.go b/modules/frontend/search_streaming_test.go index cf953ab2ea0..34a7929fef5 100644 --- a/modules/frontend/search_streaming_test.go +++ b/modules/frontend/search_streaming_test.go @@ -88,10 +88,10 @@ func TestStreamingSearchHandlerSucceeds(t *testing.T) { &tempopb.SearchResponse{ Traces: traceResp, Metrics: &tempopb.SearchMetrics{ - TotalBlocks: 1, + TotalBlocks: 10, // jpe : should be 1 CompletedJobs: 2, - TotalJobs: 2, - TotalBlockBytes: 209715200, + TotalJobs: 10, // jpe : should be 2 + TotalBlockBytes: 1000, // jpe - should be 209715200 }, }, ) @@ -129,12 +129,11 @@ func TestStreamingSearchHandlerStreams(t *testing.T) { &tempopb.SearchResponse{ Traces: traceResp, Metrics: &tempopb.SearchMetrics{ - TotalBlocks: 1, - CompletedJobs: r.Metrics.CompletedJobs, - TotalJobs: 2, - TotalBlockBytes: 209715200, - }, - }, + TotalBlocks: 10, // jpe : should be 1 + CompletedJobs: 1, // jpe : can't get to pass? + TotalJobs: 10, // jpe : should be 2 + TotalBlockBytes: 1000, // jpe - should be 209715200 + }}, ) } }, diff --git a/modules/frontend/searchsharding.go b/modules/frontend/searchsharding.go index c40e782e429..c073c42ad33 100644 --- a/modules/frontend/searchsharding.go +++ b/modules/frontend/searchsharding.go @@ -72,6 +72,13 @@ type SearchSharderConfig struct { QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"` } +// jpe - pool me? +type backendReqMsg struct { + req *http.Request + meta *backend.BlockMeta + err error +} + // newSearchSharder creates a sharding middleware for search func newSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, sloCfg SLOConfig, progress searchProgressFactory, logger log.Logger) Middleware { return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { @@ -135,32 +142,37 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { return nil, err } - // pass subCtx in requests so we can cancel and exit early - reqs, blocks, err := s.backendRequests(subCtx, tenantID, r, searchReq) - if err != nil { - return nil, err - } - span.SetTag("block-count", len(blocks)) - - // Add ingester request if we have one. it's important to add the ingester request to - // the beginning of the slice so that it is prioritized over the possibly enormous - // number of backend requests + reqCh := make(chan *backendReqMsg, 1) // buffer of 1 allows us to insert ingestReq if it exists + stopCh := make(chan struct{}) + defer close(stopCh) if ingesterReq != nil { - reqs = append([]*http.Request{ingesterReq}, reqs...) + reqCh <- &backendReqMsg{req: ingesterReq} } - span.SetTag("request-count", len(reqs)) + + // pass subCtx in requests so we can cancel and exit early + s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh) // execute requests wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests)) - totalBlockBytes := uint64(0) - for _, b := range blocks { - totalBlockBytes += b.Size - } - progress := s.progress(ctx, int(searchReq.Limit), len(reqs), len(blocks), int(totalBlockBytes)) + // jpe restore this, can we pull block meta list w/o much penalty? + // totalBlockBytes := uint64(0) + // for _, b := range blocks { + // totalBlockBytes += b.Size + // } + progress := s.progress(ctx, int(searchReq.Limit), 10 /*len(reqs)*/, 10 /*len(blocks)*/, 1000 /*int(totalBlockBytes)*/) startedReqs := 0 - for _, req := range reqs { + for req := range reqCh { + if req.err != nil { + return nil, fmt.Errorf("unexpected err building reqs: %w", req.err) + } + + if req.meta != nil { + // jpe do something with this? or return metas above? + continue + } + // if shouldQuit is true, terminate and abandon requests if progress.shouldQuit() { break @@ -215,7 +227,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { // happy path progress.addResponse(results) - }(req) + }(req.req) } // wait for all goroutines running in wg to finish or cancelled @@ -236,7 +248,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { "query", query, "duration_seconds", reqTime, "request_throughput", throughput, - "total_requests", len(reqs), + // "total_requests", len(reqs), jpe "started_requests", startedReqs, "cancelled_requests", cancelledReqs, "finished_requests", overallResponse.finishedRequests, @@ -300,29 +312,30 @@ func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend } // backendRequest builds backend requests to search backend blocks -func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest) ([]*http.Request, []*backend.BlockMeta, error) { - var err error - var reqs []*http.Request - var blocks []*backend.BlockMeta - - // request without start or end, search only in ingester - if searchReq.Start == 0 || searchReq.End == 0 { - return reqs, blocks, nil - } +func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) { + go func() { + var blocks []*backend.BlockMeta + + // request without start or end, search only in ingester + if searchReq.Start == 0 || searchReq.End == 0 { + close(reqCh) + return + } - // calculate duration (start and end) to search the backend blocks - start, end := backendRange(searchReq, s.cfg.QueryBackendAfter) + // calculate duration (start and end) to search the backend blocks + start, end := backendRange(searchReq, s.cfg.QueryBackendAfter) - // no need to search backend - if start == end { - return reqs, blocks, nil - } + // no need to search backend + if start == end { + close(reqCh) + return + } - // get block metadata of blocks in start, end duration - blocks = s.blockMetas(int64(start), int64(end), tenantID) + // get block metadata of blocks in start, end duration + blocks = s.blockMetas(int64(start), int64(end), tenantID) - reqs, err = buildBackendRequests(ctx, tenantID, parent, blocks, s.cfg.TargetBytesPerRequest) - return reqs, blocks, err + buildBackendRequests(ctx, tenantID, parent, blocks, s.cfg.TargetBytesPerRequest, reqCh, stopCh) + }() } // backendRange returns a new start/end range for the backend based on the config parameter @@ -348,16 +361,20 @@ func backendRange(searchReq *tempopb.SearchRequest, queryBackendAfter time.Durat // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. -func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int) ([]*http.Request, error) { - reqs := []*http.Request{} +func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) { + defer close(reqCh) + for _, m := range metas { + reqCh <- &backendReqMsg{meta: m} + if m.Size == 0 || m.TotalRecords == 0 { continue } bytesPerPage := m.Size / uint64(m.TotalRecords) if bytesPerPage == 0 { - return nil, fmt.Errorf("block %s has an invalid 0 bytes per page", m.BlockID) + reqCh <- &backendReqMsg{err: fmt.Errorf("block %s has an invalid 0 bytes per page", m.BlockID)} + return } pagesPerQuery := bytesPerRequest / int(bytesPerPage) if pagesPerQuery == 0 { @@ -383,15 +400,19 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req }) if err != nil { - return nil, err + reqCh <- &backendReqMsg{err: err} + return } subR.RequestURI = buildUpstreamRequestURI(parent.URL.Path, subR.URL.Query()) - reqs = append(reqs, subR) + + select { + case reqCh <- &backendReqMsg{req: subR}: + case <-stopCh: + return + } } } - - return reqs, nil } // ingesterRequest returns a new start and end time range for the backend as well as an http request diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index 39581a86114..c27f8401f6b 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -62,6 +62,7 @@ func (m *mockReader) Fetch(ctx context.Context, meta *backend.BlockMeta, req tra func (m *mockReader) EnablePolling(sharder blocklist.JobSharder) {} func (m *mockReader) Shutdown() {} +// jpe - add tests that expect error on this and one below func TestBuildBackendRequests(t *testing.T) { tests := []struct { targetBytesPerRequest int @@ -181,18 +182,32 @@ func TestBuildBackendRequests(t *testing.T) { for _, tc := range tests { req := httptest.NewRequest("GET", "/?k=test&v=test&start=10&end=20", nil) - reqs, err := buildBackendRequests(context.Background(), "test", req, tc.metas, tc.targetBytesPerRequest) - if tc.expectedError != nil { - assert.Equal(t, tc.expectedError, err) - continue - } - assert.NoError(t, err) + stopCh := make(chan struct{}) + defer close(stopCh) + reqCh := make(chan *backendReqMsg) + + go func() { + buildBackendRequests(context.Background(), "test", req, tc.metas, tc.targetBytesPerRequest, reqCh, stopCh) + }() actualURIs := []string{} - for _, r := range reqs { - actualURIs = append(actualURIs, r.RequestURI) + var actualErr error + for r := range reqCh { + if r.err != nil { + actualErr = r.err + break + } + + if r.req != nil { + actualURIs = append(actualURIs, r.req.RequestURI) + } } + if tc.expectedError != nil { + assert.Equal(t, tc.expectedError, actualErr) + continue + } + assert.NoError(t, actualErr) assert.Equal(t, tc.expectedURIs, actualURIs) } } @@ -271,20 +286,29 @@ func TestBackendRequests(t *testing.T) { searchReq, err := api.ParseSearchRequest(r) require.NoError(t, err) - reqs, blocks, err := s.backendRequests(context.TODO(), "test", r, searchReq) - assert.Equal(t, tc.expectedError, err) + stopCh := make(chan struct{}) + defer close(stopCh) + reqCh := make(chan *backendReqMsg) - reqURIs := make([]string, 0) - for _, req := range reqs { - reqURIs = append(reqURIs, req.RequestURI) - } - assert.Equal(t, tc.expectedReqsURIs, reqURIs) + s.backendRequests(context.TODO(), "test", r, searchReq, reqCh, stopCh) - blockIDs := make([]string, 0) - for _, block := range blocks { - blockIDs = append(blockIDs, block.BlockID.String()) + var actualErr error + actualBlockIDs := []string{} + actualReqURIs := []string{} + for r := range reqCh { + if r.err != nil { + actualErr = r.err + } + if r.req != nil { + actualReqURIs = append(actualReqURIs, r.req.RequestURI) + } + if r.meta != nil { + actualBlockIDs = append(actualBlockIDs, r.meta.BlockID.String()) + } } - assert.Equal(t, tc.expectedBlockIDs, blockIDs) + assert.Equal(t, tc.expectedError, actualErr) + assert.Equal(t, tc.expectedReqsURIs, actualReqURIs) + assert.Equal(t, tc.expectedBlockIDs, actualBlockIDs) }) } } From b167a7d19889faeec0757ad187964dde105f66ba Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 31 May 2023 13:47:21 -0400 Subject: [PATCH 3/6] tests and benches Signed-off-by: Joe Elliott --- modules/frontend/search_progress.go | 2 +- modules/frontend/searchsharding_test.go | 71 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/modules/frontend/search_progress.go b/modules/frontend/search_progress.go index 155c373aab8..57c0e864d43 100644 --- a/modules/frontend/search_progress.go +++ b/modules/frontend/search_progress.go @@ -117,7 +117,7 @@ func (r *searchProgress) internalShouldQuit() bool { if r.statusCode/100 != 2 { return true } - if r.resultsCombiner.Count() > r.limit { + if r.resultsCombiner.Count() >= r.limit { return true } diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index c27f8401f6b..b4f50d9c784 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -932,3 +932,74 @@ func TestSubRequestsCancelled(t *testing.T) { }) } } + +func BenchmarkSearchSharderRoundTrip5(b *testing.B) { benchmarkSearchSharderRoundTrip(b, 5) } +func BenchmarkSearchSharderRoundTrip500(b *testing.B) { benchmarkSearchSharderRoundTrip(b, 500) } +func BenchmarkSearchSharderRoundTrip50000(b *testing.B) { benchmarkSearchSharderRoundTrip(b, 50000) } // max, forces all queries to run +// jpe does something block if all requests are hit? + +func benchmarkSearchSharderRoundTrip(b *testing.B, s int32) { + resString, err := (&jsonpb.Marshaler{}).MarshalToString(&tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}) + require.NoError(b, err) + + successResString, err := (&jsonpb.Marshaler{}).MarshalToString(&tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1234", + }, + }, + Metrics: &tempopb.SearchMetrics{}, + }) + require.NoError(b, err) + + succeedAfter := atomic.NewInt32(s) + next := RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + val := succeedAfter.Dec() + + s := resString + if val == 0 { + s = successResString + } + + return &http.Response{ + Body: io.NopCloser(strings.NewReader(s)), + StatusCode: 200, + }, nil + }) + + o, err := overrides.NewOverrides(overrides.Limits{}) + require.NoError(b, err) + + totalMetas := 10000 + jobsPerMeta := 2 + metas := make([]*backend.BlockMeta, 0, totalMetas) + for i := 0; i < totalMetas; i++ { + metas = append(metas, &backend.BlockMeta{ + StartTime: time.Unix(1100, 0), + EndTime: time.Unix(1200, 0), + Size: defaultTargetBytesPerRequest * uint64(jobsPerMeta), + TotalRecords: uint32(jobsPerMeta), + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), + }) + } + + sharder := newSearchSharder(&mockReader{ + metas: metas, + }, o, SearchSharderConfig{ + ConcurrentRequests: 100, + TargetBytesPerRequest: defaultTargetBytesPerRequest, + }, testSLOcfg, newSearchProgress, log.NewNopLogger()) + testRT := NewRoundTripper(next, sharder) + + req := httptest.NewRequest("GET", "/?start=1000&end=1500&limit=1", nil) // limiting to 1 to let succeedAfter work + ctx := req.Context() + ctx = user.InjectOrgID(ctx, "blerg") + req = req.WithContext(ctx) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + succeedAfter = atomic.NewInt32(s) + _, err = testRT.RoundTrip(req) + require.NoError(b, err) + } +} From 0e2fe7c86a2310d11c1307fd8a99cb5249db2d3c Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 1 Jun 2023 09:35:19 -0400 Subject: [PATCH 4/6] restore stats Signed-off-by: Joe Elliott --- modules/frontend/search_streaming_test.go | 14 +-- modules/frontend/searchsharding.go | 119 ++++++++++++---------- modules/frontend/searchsharding_test.go | 17 +--- 3 files changed, 77 insertions(+), 73 deletions(-) diff --git a/modules/frontend/search_streaming_test.go b/modules/frontend/search_streaming_test.go index 34a7929fef5..e8ff089ddbd 100644 --- a/modules/frontend/search_streaming_test.go +++ b/modules/frontend/search_streaming_test.go @@ -88,10 +88,10 @@ func TestStreamingSearchHandlerSucceeds(t *testing.T) { &tempopb.SearchResponse{ Traces: traceResp, Metrics: &tempopb.SearchMetrics{ - TotalBlocks: 10, // jpe : should be 1 + TotalBlocks: 1, CompletedJobs: 2, - TotalJobs: 10, // jpe : should be 2 - TotalBlockBytes: 1000, // jpe - should be 209715200 + TotalJobs: 2, + TotalBlockBytes: 209715200, }, }, ) @@ -129,10 +129,10 @@ func TestStreamingSearchHandlerStreams(t *testing.T) { &tempopb.SearchResponse{ Traces: traceResp, Metrics: &tempopb.SearchMetrics{ - TotalBlocks: 10, // jpe : should be 1 - CompletedJobs: 1, // jpe : can't get to pass? - TotalJobs: 10, // jpe : should be 2 - TotalBlockBytes: 1000, // jpe - should be 209715200 + TotalBlocks: 1, + CompletedJobs: r.Metrics.CompletedJobs, // jpe ? + TotalJobs: 2, + TotalBlockBytes: 209715200, }}, ) } diff --git a/modules/frontend/searchsharding.go b/modules/frontend/searchsharding.go index c073c42ad33..a1e1432b432 100644 --- a/modules/frontend/searchsharding.go +++ b/modules/frontend/searchsharding.go @@ -74,9 +74,8 @@ type SearchSharderConfig struct { // jpe - pool me? type backendReqMsg struct { - req *http.Request - meta *backend.BlockMeta - err error + req *http.Request + err error } // newSearchSharder creates a sharding middleware for search @@ -150,17 +149,11 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { } // pass subCtx in requests so we can cancel and exit early - s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh) + totalBlocks, totalBlockBytes, totalJobs := s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh) // execute requests wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests)) - - // jpe restore this, can we pull block meta list w/o much penalty? - // totalBlockBytes := uint64(0) - // for _, b := range blocks { - // totalBlockBytes += b.Size - // } - progress := s.progress(ctx, int(searchReq.Limit), 10 /*len(reqs)*/, 10 /*len(blocks)*/, 1000 /*int(totalBlockBytes)*/) + progress := s.progress(ctx, int(searchReq.Limit), totalJobs, totalBlocks, totalBlockBytes) startedReqs := 0 for req := range reqCh { @@ -168,11 +161,6 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { return nil, fmt.Errorf("unexpected err building reqs: %w", req.err) } - if req.meta != nil { - // jpe do something with this? or return metas above? - continue - } - // if shouldQuit is true, terminate and abandon requests if progress.shouldQuit() { break @@ -248,7 +236,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { "query", query, "duration_seconds", reqTime, "request_throughput", throughput, - // "total_requests", len(reqs), jpe + "total_requests", totalJobs, "started_requests", startedReqs, "cancelled_requests", cancelledReqs, "finished_requests", overallResponse.finishedRequests, @@ -299,8 +287,8 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { // blockMetas returns all relevant blockMetas given a start/end func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta { // reduce metas to those in the requested range - metas := []*backend.BlockMeta{} allMetas := s.reader.BlockMetas(tenantID) + metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck for _, m := range allMetas { if m.StartTime.Unix() <= end && m.EndTime.Unix() >= start { @@ -311,31 +299,48 @@ func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend return metas } -// backendRequest builds backend requests to search backend blocks -func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) { - go func() { - var blocks []*backend.BlockMeta +// backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. +// it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs +func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes, estimatedJobs int) { + var blocks []*backend.BlockMeta - // request without start or end, search only in ingester - if searchReq.Start == 0 || searchReq.End == 0 { - close(reqCh) - return - } + // request without start or end, search only in ingester + if searchReq.Start == 0 || searchReq.End == 0 { + close(reqCh) + return + } - // calculate duration (start and end) to search the backend blocks - start, end := backendRange(searchReq, s.cfg.QueryBackendAfter) + // calculate duration (start and end) to search the backend blocks + start, end := backendRange(searchReq, s.cfg.QueryBackendAfter) - // no need to search backend - if start == end { - close(reqCh) - return - } + // no need to search backend + if start == end { + close(reqCh) + return + } + + // get block metadata of blocks in start, end duration + blocks = s.blockMetas(int64(start), int64(end), tenantID) - // get block metadata of blocks in start, end duration - blocks = s.blockMetas(int64(start), int64(end), tenantID) + targetBytesPerRequest := s.cfg.TargetBytesPerRequest + + // calculate metrics to return to the caller + totalBlocks = len(blocks) + for _, b := range blocks { + p := pagesPerRequest(b, targetBytesPerRequest) + + estimatedJobs += int(b.TotalRecords) / p + if int(b.TotalRecords)%p != 0 { + estimatedJobs++ + } + totalBlockBytes += int(b.Size) + } - buildBackendRequests(ctx, tenantID, parent, blocks, s.cfg.TargetBytesPerRequest, reqCh, stopCh) + go func() { + buildBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh) }() + + return } // backendRange returns a new start/end range for the backend based on the config parameter @@ -365,31 +370,20 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req defer close(reqCh) for _, m := range metas { - reqCh <- &backendReqMsg{meta: m} - - if m.Size == 0 || m.TotalRecords == 0 { + pages := pagesPerRequest(m, bytesPerRequest) + if pages == 0 { continue } - bytesPerPage := m.Size / uint64(m.TotalRecords) - if bytesPerPage == 0 { - reqCh <- &backendReqMsg{err: fmt.Errorf("block %s has an invalid 0 bytes per page", m.BlockID)} - return - } - pagesPerQuery := bytesPerRequest / int(bytesPerPage) - if pagesPerQuery == 0 { - pagesPerQuery = 1 // have to have at least 1 page per query - } - blockID := m.BlockID.String() - for startPage := 0; startPage < int(m.TotalRecords); startPage += pagesPerQuery { + for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { subR := parent.Clone(ctx) subR.Header.Set(user.OrgIDHeaderName, tenantID) subR, err := api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{ BlockID: blockID, StartPage: uint32(startPage), - PagesToSearch: uint32(pagesPerQuery), + PagesToSearch: uint32(pages), Encoding: m.Encoding.String(), IndexPageSize: m.IndexPageSize, TotalRecords: m.TotalRecords, @@ -415,6 +409,27 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req } } +// pagesPerRequest returns an integer value that indicates the number of pages +// that should be searched per query. This value is based on the target number of bytes +// 0 is returned if there is no valid answer +func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int { + if m.Size == 0 || m.TotalRecords == 0 { + return 0 + } + + bytesPerPage := m.Size / uint64(m.TotalRecords) + if bytesPerPage == 0 { + return 0 + } + + pagesPerQuery := bytesPerRequest / int(bytesPerPage) + if pagesPerQuery == 0 { + pagesPerQuery = 1 // have to have at least 1 page per query + } + + return pagesPerQuery +} + // ingesterRequest returns a new start and end time range for the backend as well as an http request // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index b4f50d9c784..abc5d3d1839 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -212,6 +212,7 @@ func TestBuildBackendRequests(t *testing.T) { } } +// jpe add tests for return vals func TestBackendRequests(t *testing.T) { bm := backend.NewBlockMeta("test", uuid.New(), "wdwad", backend.EncGZIP, "asdf") bm.StartTime = time.Unix(100, 0) @@ -228,7 +229,6 @@ func TestBackendRequests(t *testing.T) { name string request string expectedReqsURIs []string - expectedBlockIDs []string expectedError error }{ { @@ -238,8 +238,7 @@ func TestBackendRequests(t *testing.T) { "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=200&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=100&startPage=0&tags=foo%3Dbar&totalRecords=2&version=wdwad", "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=200&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=100&startPage=1&tags=foo%3Dbar&totalRecords=2&version=wdwad", }, - expectedBlockIDs: []string{bm.BlockID.String()}, - expectedError: nil, + expectedError: nil, }, { name: "start and end in block", @@ -248,35 +247,30 @@ func TestBackendRequests(t *testing.T) { "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=150&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=110&startPage=0&tags=foo%3Dbar&totalRecords=2&version=wdwad", "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=150&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=110&startPage=1&tags=foo%3Dbar&totalRecords=2&version=wdwad", }, - expectedBlockIDs: []string{bm.BlockID.String()}, - expectedError: nil, + expectedError: nil, }, { name: "start and end out of block", request: "/?tags=foo%3Dbar&minDuration=10ms&maxDuration=30ms&limit=50&start=10&end=20", expectedReqsURIs: make([]string, 0), - expectedBlockIDs: make([]string, 0), expectedError: nil, }, { name: "no start and end", request: "/?tags=foo%3Dbar&minDuration=10ms&maxDuration=30ms&limit=50", expectedReqsURIs: make([]string, 0), - expectedBlockIDs: make([]string, 0), expectedError: nil, }, { name: "only tags", request: "/?tags=foo%3Dbar", expectedReqsURIs: make([]string, 0), - expectedBlockIDs: make([]string, 0), expectedError: nil, }, { name: "no params", request: "/", expectedReqsURIs: make([]string, 0), - expectedBlockIDs: make([]string, 0), expectedError: nil, }, } @@ -293,7 +287,6 @@ func TestBackendRequests(t *testing.T) { s.backendRequests(context.TODO(), "test", r, searchReq, reqCh, stopCh) var actualErr error - actualBlockIDs := []string{} actualReqURIs := []string{} for r := range reqCh { if r.err != nil { @@ -302,13 +295,9 @@ func TestBackendRequests(t *testing.T) { if r.req != nil { actualReqURIs = append(actualReqURIs, r.req.RequestURI) } - if r.meta != nil { - actualBlockIDs = append(actualBlockIDs, r.meta.BlockID.String()) - } } assert.Equal(t, tc.expectedError, actualErr) assert.Equal(t, tc.expectedReqsURIs, actualReqURIs) - assert.Equal(t, tc.expectedBlockIDs, actualBlockIDs) }) } } From f45315ddd6caad433e31b8936db9a73bc7a539aa Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 1 Jun 2023 10:09:09 -0400 Subject: [PATCH 5/6] changelog Signed-off-by: Joe Elliott --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 901f48e3586..74970228909 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ To make use of filtering, configure `autocomplete_filtering_enabled`. * [ENHANCEMENT] Add span filtering to spanmetrics processor [#2274](https://github.com/grafana/tempo/pull/2274) (@zalegrala) * [ENHANCEMENT] Add ability to detect virtual nodes in the servicegraph processor [#2365](https://github.com/grafana/tempo/pull/2365) (@mapno) * [ENHANCEMENT] Introduce `overrides.Interface` to decouple implementation from usage [#2482](https://github.com/grafana/tempo/pull/2482) (@kvrhdn) +* [ENHANCEMENT] Improve TraceQL throughput by asynchronously creating jobs [#2530](https://github.com/grafana/tempo/pull/2530) (@joe-elliott) * [BUGFIX] tempodb integer divide by zero error [#2167](https://github.com/grafana/tempo/issues/2167) (@kroksys) * [BUGFIX] metrics-generator: ensure Prometheus will scale up shards when remote write is lagging behind [#2463](https://github.com/grafana/tempo/issues/2463) (@kvrhdn) * [BUGFIX] Fixes issue where matches and other spanset level attributes were not persisted to the TraceQL results. [#2490](https://github.com/grafana/tempo/pull/2490) From 23fbd89c32e5c9573f7a26f8a7d2eb5579bcdc64 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 1 Jun 2023 10:22:48 -0400 Subject: [PATCH 6/6] tests + uint64 totalBlockBytes Signed-off-by: Joe Elliott --- modules/frontend/search_progress.go | 6 ++--- modules/frontend/search_progress_test.go | 3 --- modules/frontend/search_streaming.go | 4 +-- modules/frontend/search_streaming_test.go | 2 +- modules/frontend/searchsharding.go | 11 ++++---- modules/frontend/searchsharding_test.go | 33 ++++++++++++++--------- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/modules/frontend/search_progress.go b/modules/frontend/search_progress.go index 57c0e864d43..552e68fe794 100644 --- a/modules/frontend/search_progress.go +++ b/modules/frontend/search_progress.go @@ -11,7 +11,7 @@ import ( // searchProgressFactory is used to provide a way to construct a shardedSearchProgress to the searchSharder. It exists // so that streaming search can inject and track it's own special progress object -type searchProgressFactory func(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress +type searchProgressFactory func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress // shardedSearchProgress is an interface that allows us to get progress // events from the search sharding handler. @@ -50,7 +50,7 @@ type searchProgress struct { mtx sync.Mutex } -func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress { +func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress { return &searchProgress{ ctx: ctx, statusCode: http.StatusOK, @@ -58,7 +58,7 @@ func newSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, total finishedRequests: 0, resultsMetrics: &tempopb.SearchMetrics{ TotalBlocks: uint32(totalBlocks), - TotalBlockBytes: uint64(totalBlockBytes), + TotalBlockBytes: totalBlockBytes, TotalJobs: uint32(totalJobs), }, resultsCombiner: traceql.NewMetadataCombiner(), diff --git a/modules/frontend/search_progress_test.go b/modules/frontend/search_progress_test.go index b4994a71663..26f5d853bd9 100644 --- a/modules/frontend/search_progress_test.go +++ b/modules/frontend/search_progress_test.go @@ -70,9 +70,6 @@ func TestSearchProgressShouldQuit(t *testing.T) { { TraceID: "otherthing", }, - { - TraceID: "thingthatsdifferent", - }, }, Metrics: &tempopb.SearchMetrics{}, }) diff --git a/modules/frontend/search_streaming.go b/modules/frontend/search_streaming.go index 616aa46a5e9..f6e95465afd 100644 --- a/modules/frontend/search_streaming.go +++ b/modules/frontend/search_streaming.go @@ -32,7 +32,7 @@ type diffSearchProgress struct { mtx sync.Mutex } -func newDiffSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) *diffSearchProgress { +func newDiffSearchProgress(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) *diffSearchProgress { return &diffSearchProgress{ seenTraces: map[string]struct{}{}, progress: newSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes), @@ -119,7 +119,7 @@ func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream htt } progress := atomic.NewPointer[*diffSearchProgress](nil) - fn := func(ctx context.Context, limit, totalJobs, totalBlocks, totalBlockBytes int) shardedSearchProgress { + fn := func(ctx context.Context, limit, totalJobs, totalBlocks int, totalBlockBytes uint64) shardedSearchProgress { p := newDiffSearchProgress(ctx, limit, totalJobs, totalBlocks, totalBlockBytes) progress.Store(&p) return p diff --git a/modules/frontend/search_streaming_test.go b/modules/frontend/search_streaming_test.go index e8ff089ddbd..33f7946fa92 100644 --- a/modules/frontend/search_streaming_test.go +++ b/modules/frontend/search_streaming_test.go @@ -130,7 +130,7 @@ func TestStreamingSearchHandlerStreams(t *testing.T) { Traces: traceResp, Metrics: &tempopb.SearchMetrics{ TotalBlocks: 1, - CompletedJobs: r.Metrics.CompletedJobs, // jpe ? + CompletedJobs: r.Metrics.CompletedJobs, TotalJobs: 2, TotalBlockBytes: 209715200, }}, diff --git a/modules/frontend/searchsharding.go b/modules/frontend/searchsharding.go index a1e1432b432..7a22747a400 100644 --- a/modules/frontend/searchsharding.go +++ b/modules/frontend/searchsharding.go @@ -72,7 +72,6 @@ type SearchSharderConfig struct { QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"` } -// jpe - pool me? type backendReqMsg struct { req *http.Request err error @@ -149,7 +148,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) { } // pass subCtx in requests so we can cancel and exit early - totalBlocks, totalBlockBytes, totalJobs := s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh) + totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(subCtx, tenantID, r, searchReq, reqCh, stopCh) // execute requests wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests)) @@ -301,7 +300,7 @@ func (s *searchSharder) blockMetas(start, end int64, tenantID string) []*backend // backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. // it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs -func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes, estimatedJobs int) { +func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) (totalJobs, totalBlocks int, totalBlockBytes uint64) { var blocks []*backend.BlockMeta // request without start or end, search only in ingester @@ -329,11 +328,11 @@ func (s *searchSharder) backendRequests(ctx context.Context, tenantID string, pa for _, b := range blocks { p := pagesPerRequest(b, targetBytesPerRequest) - estimatedJobs += int(b.TotalRecords) / p + totalJobs += int(b.TotalRecords) / p if int(b.TotalRecords)%p != 0 { - estimatedJobs++ + totalJobs++ } - totalBlockBytes += int(b.Size) + totalBlockBytes += b.Size } go func() { diff --git a/modules/frontend/searchsharding_test.go b/modules/frontend/searchsharding_test.go index abc5d3d1839..c69636c7f82 100644 --- a/modules/frontend/searchsharding_test.go +++ b/modules/frontend/searchsharding_test.go @@ -62,7 +62,6 @@ func (m *mockReader) Fetch(ctx context.Context, meta *backend.BlockMeta, req tra func (m *mockReader) EnablePolling(sharder blocklist.JobSharder) {} func (m *mockReader) Shutdown() {} -// jpe - add tests that expect error on this and one below func TestBuildBackendRequests(t *testing.T) { tests := []struct { targetBytesPerRequest int @@ -212,7 +211,6 @@ func TestBuildBackendRequests(t *testing.T) { } } -// jpe add tests for return vals func TestBackendRequests(t *testing.T) { bm := backend.NewBlockMeta("test", uuid.New(), "wdwad", backend.EncGZIP, "asdf") bm.StartTime = time.Unix(100, 0) @@ -226,10 +224,13 @@ func TestBackendRequests(t *testing.T) { } tests := []struct { - name string - request string - expectedReqsURIs []string - expectedError error + name string + request string + expectedReqsURIs []string + expectedError error + expectedJobs int + expectedBlocks int + expectedBlockBytes uint64 }{ { name: "start and end same as block", @@ -238,7 +239,10 @@ func TestBackendRequests(t *testing.T) { "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=200&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=100&startPage=0&tags=foo%3Dbar&totalRecords=2&version=wdwad", "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=200&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=100&startPage=1&tags=foo%3Dbar&totalRecords=2&version=wdwad", }, - expectedError: nil, + expectedError: nil, + expectedJobs: 2, + expectedBlocks: 1, + expectedBlockBytes: defaultTargetBytesPerRequest * 2, }, { name: "start and end in block", @@ -247,7 +251,10 @@ func TestBackendRequests(t *testing.T) { "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=150&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=110&startPage=0&tags=foo%3Dbar&totalRecords=2&version=wdwad", "/querier?blockID=" + bm.BlockID.String() + "&dataEncoding=asdf&encoding=gzip&end=150&footerSize=0&indexPageSize=0&limit=50&maxDuration=30ms&minDuration=10ms&pagesToSearch=1&size=209715200&start=110&startPage=1&tags=foo%3Dbar&totalRecords=2&version=wdwad", }, - expectedError: nil, + expectedError: nil, + expectedJobs: 2, + expectedBlocks: 1, + expectedBlockBytes: defaultTargetBytesPerRequest * 2, }, { name: "start and end out of block", @@ -284,7 +291,10 @@ func TestBackendRequests(t *testing.T) { defer close(stopCh) reqCh := make(chan *backendReqMsg) - s.backendRequests(context.TODO(), "test", r, searchReq, reqCh, stopCh) + jobs, blocks, blockBytes := s.backendRequests(context.TODO(), "test", r, searchReq, reqCh, stopCh) + require.Equal(t, tc.expectedJobs, jobs) + require.Equal(t, tc.expectedBlocks, blocks) + require.Equal(t, tc.expectedBlockBytes, blockBytes) var actualErr error actualReqURIs := []string{} @@ -296,8 +306,8 @@ func TestBackendRequests(t *testing.T) { actualReqURIs = append(actualReqURIs, r.req.RequestURI) } } - assert.Equal(t, tc.expectedError, actualErr) - assert.Equal(t, tc.expectedReqsURIs, actualReqURIs) + require.Equal(t, tc.expectedError, actualErr) + require.Equal(t, tc.expectedReqsURIs, actualReqURIs) }) } } @@ -925,7 +935,6 @@ func TestSubRequestsCancelled(t *testing.T) { func BenchmarkSearchSharderRoundTrip5(b *testing.B) { benchmarkSearchSharderRoundTrip(b, 5) } func BenchmarkSearchSharderRoundTrip500(b *testing.B) { benchmarkSearchSharderRoundTrip(b, 500) } func BenchmarkSearchSharderRoundTrip50000(b *testing.B) { benchmarkSearchSharderRoundTrip(b, 50000) } // max, forces all queries to run -// jpe does something block if all requests are hit? func benchmarkSearchSharderRoundTrip(b *testing.B, s int32) { resString, err := (&jsonpb.Marshaler{}).MarshalToString(&tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}})