Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Frontend: Reduce allocs #4007

Merged
merged 9 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [ENHANCEMENT] Prevent massive allocations in the frontend if there is not sufficient pressure from the query pipeline. [#3996](https://github.com/grafana/tempo/pull/3996) (@joe-elliott)
**BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing.
* [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar)
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)

# v2.6.0-rc.0

Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/cmd-query-metrics-query-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (cmd *metricsQueryCmd) queryRangeHTTP(req *tempopb.QueryRangeRequest) error
return err
}

httpReq = api.BuildQueryRangeRequest(httpReq, req)
httpReq = api.BuildQueryRangeRequest(httpReq, req, "")
httpReq.Header = http.Header{}
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), cmd.OrgID), httpReq)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/metrics_query_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newQueryInstantStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTri
URL: &url.URL{Path: downstreamPath},
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader([]byte{})),
}, qr)
}, qr, "") // dedicated cols are never passed from the caller
httpReq = httpReq.Clone(ctx)

var finalResponse *tempopb.QueryInstantResponse
Expand Down Expand Up @@ -110,7 +110,7 @@ func newMetricsQueryInstantHTTPHandler(cfg Config, next pipeline.AsyncRoundTripp
// Clone existing to keep it unaltered.
req = req.Clone(req.Context())
req.URL.Path = strings.ReplaceAll(req.URL.Path, api.PathMetricsQueryInstant, api.PathMetricsQueryRange)
req = api.BuildQueryRangeRequest(req, qr)
req = api.BuildQueryRangeRequest(req, qr, "") // dedicated cols are never passed from the caller

combiner, err := combiner.NewTypedQueryRange(qr, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp
URL: &url.URL{Path: downstreamPath},
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader([]byte{})),
}, req)
}, req, "") // dedicated cols are never passed from the caller

ctx := srv.Context()
httpReq = httpReq.WithContext(ctx)
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/metrics_query_range_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) {
Start: uint64(1100 * time.Second),
End: uint64(1200 * time.Second),
Step: uint64(100 * time.Second),
})
}, "")

ctx := user.InjectOrgID(httpReq.Context(), tenant)
httpReq = httpReq.WithContext(ctx)
Expand Down
34 changes: 15 additions & 19 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
defer close(reqCh)

queryHash := hashForQueryRangeRequest(&searchReq)
colsToJSON := api.NewDedicatedColumnsToJSON()

exemplarsPerBlock := s.exemplarsPerShard(uint32(len(metas)))
for _, m := range metas {
Expand All @@ -231,7 +232,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)

dc, err := m.DedicatedColumns.ToTempopb()
dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
// errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
Expand All @@ -253,18 +254,18 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
Step: step,
QueryMode: searchReq.QueryMode,
// New RF1 fields
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
Exemplars: exemplars,
BlockID: m.BlockID.String(),
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Version: m.Version,
Encoding: m.Encoding.String(),
Size_: m.Size,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
Exemplars: exemplars,
}

subR = api.BuildQueryRangeRequest(subR, queryRangeReq)
subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON)

prepareRequestForQueriers(subR, tenantID)
pipelineR := pipeline.NewHTTPRequest(subR)
Expand Down Expand Up @@ -302,16 +303,11 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest
searchReq.QueryMode = querier.QueryModeRecent
searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this

req := s.toUpstreamRequest(parent.Context(), searchReq, parent, tenantID)

return req
}

func (s *queryRangeSharder) toUpstreamRequest(ctx context.Context, req tempopb.QueryRangeRequest, parent *http.Request, tenantID string) *http.Request {
subR := parent.Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &req)
subR := parent.Clone(parent.Context())
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators

prepareRequestForQueriers(subR, tenantID)

return subR
}

Expand Down
27 changes: 14 additions & 13 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req
defer close(reqCh)

queryHash := hashForSearchRequest(searchReq)
colsToJSON := api.NewDedicatedColumnsToJSON()

for _, m := range metas {
pages := pagesPerRequest(m, bytesPerRequest)
Expand All @@ -314,25 +315,25 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)

dc, err := m.DedicatedColumns.ToTempopb()
dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
continue
}

subR, err = api.BuildSearchBlockRequest(subR, &tempopb.SearchBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
DataEncoding: m.DataEncoding,
Version: m.Version,
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
})
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
DataEncoding: m.DataEncoding,
Version: m.Version,
Size_: m.Size,
FooterSize: m.FooterSize,
// DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
}, dedColsJSON)
if err != nil {
errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err))
continue
Expand Down
7 changes: 6 additions & 1 deletion modules/querier/external/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package external
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -149,7 +150,11 @@ func (s *Client) Search(ctx context.Context, maxBytes int, searchReq *tempopb.Se
if err != nil {
return nil, fmt.Errorf("external endpoint failed to make new request: %w", err)
}
req, err = api.BuildSearchBlockRequest(req, searchReq)
columnsJSON, err := json.Marshal(searchReq.DedicatedColumns)
if err != nil {
return nil, err
}
req, err = api.BuildSearchBlockRequest(req, searchReq, string(columnsJSON))
if err != nil {
return nil, fmt.Errorf("external endpoint failed to build search block request: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/api/dedicated_columns_to_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package api

import (
"encoding/json"
"unsafe"

"github.com/grafana/tempo/tempodb/backend"
)

type DedicatedColumnsToJSON struct {
columnsToJSON map[uint64]string
}

func NewDedicatedColumnsToJSON() *DedicatedColumnsToJSON {
return &DedicatedColumnsToJSON{
columnsToJSON: make(map[uint64]string),
}
}

func (d *DedicatedColumnsToJSON) JSONForDedicatedColumns(cols backend.DedicatedColumns) (string, error) {
if len(cols) == 0 {
return "", nil
}

hash := cols.Hash()
if jsonString, ok := d.columnsToJSON[hash]; ok {
return jsonString, nil
}

proto, err := cols.ToTempopb()
if err != nil {
return "", err
}

jsonBytes, err := json.Marshal(proto)
if err != nil {
return "", err
}

jsonString := unsafe.String(unsafe.SliceData(jsonBytes), len(jsonBytes))
d.columnsToJSON[hash] = jsonString

return jsonString, nil
}
66 changes: 66 additions & 0 deletions pkg/api/dedicated_columns_to_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package api

import (
"encoding/json"
"math/rand/v2"
"testing"

"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/require"
)

func TestDedicatedColumnsToJson(t *testing.T) {
d := NewDedicatedColumnsToJSON()

testCols := []backend.DedicatedColumns{}
for i := 0; i < 10; i++ {
testCols = append(testCols, randoDedicatedCols())
}

// do all test cols 2x to test caching
for i := 0; i < 2; i++ {
for _, cols := range testCols {
expectedJSON := dedicatedColsToJSON(t, cols)
actualJSON, err := d.JSONForDedicatedColumns(cols)
require.NoError(t, err)

require.Equal(t, expectedJSON, actualJSON, "iteration %d, cols: %v", i, cols)
}
}
}

func dedicatedColsToJSON(t *testing.T, cols backend.DedicatedColumns) string {
t.Helper()

proto, err := cols.ToTempopb()
require.NoError(t, err)

jsonBytes, err := json.Marshal(proto)
require.NoError(t, err)

return string(jsonBytes)
}

// randoDedicatedCols generates a random set of cols for testing
func randoDedicatedCols() backend.DedicatedColumns {
colCount := rand.IntN(5) + 1
ret := make([]backend.DedicatedColumn, 0, colCount)

for i := 0; i < colCount; i++ {
scope := backend.DedicatedColumnScopeSpan
if rand.IntN(2) == 0 {
scope = backend.DedicatedColumnScopeResource
}

col := backend.DedicatedColumn{
Scope: scope,
Name: test.RandomString(),
Type: backend.DedicatedColumnTypeString,
}

ret = append(ret, col)
}

return ret
}
23 changes: 12 additions & 11 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,10 @@ func BuildQueryInstantRequest(req *http.Request, searchReq *tempopb.QueryInstant
return req
}

func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequest) *http.Request {
// BuildQueryRangeRequest takes a tempopb.QueryRangeRequest and populates the passed http.Request
// dedicatedColumnsJSON should be generated using the DedicatedColumnsToJSON struct which produces the expected string
// value and memoizes results to prevent redundant marshaling.
func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequest, dedicatedColumnsJSON string) *http.Request {
if req == nil {
req = &http.Request{
URL: &url.URL{},
Expand All @@ -466,9 +469,9 @@ func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequ
qb.addParam(urlParamEncoding, searchReq.Encoding)
qb.addParam(urlParamSize, strconv.Itoa(int(searchReq.Size_)))
qb.addParam(urlParamFooterSize, strconv.Itoa(int(searchReq.FooterSize)))
if len(searchReq.DedicatedColumns) > 0 {
columnsJSON, _ := json.Marshal(searchReq.DedicatedColumns)
qb.addParam(urlParamDedicatedColumns, string(columnsJSON))

if len(dedicatedColumnsJSON) > 0 && dedicatedColumnsJSON != "null" { // if a caller marshals a nil dedicated cols we will receive the string "null"
qb.addParam(urlParamDedicatedColumns, dedicatedColumnsJSON)
}

if len(searchReq.Query) > 0 {
Expand Down Expand Up @@ -642,7 +645,9 @@ func BuildSearchRequest(req *http.Request, searchReq *tempopb.SearchRequest) (*h

// BuildSearchBlockRequest takes a tempopb.SearchBlockRequest and populates the passed http.Request
// with the appropriate params. If no http.Request is provided a new one is created.
func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRequest) (*http.Request, error) {
// dedicatedColumnsJSON should be generated using the DedicatedColumnsToJSON struct which produces the expected string
// value and memoizes results to prevent redundant marshaling.
func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRequest, dedicatedColumnsJSON string) (*http.Request, error) {
if req == nil {
req = &http.Request{
URL: &url.URL{},
Expand All @@ -665,12 +670,8 @@ func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRe
qb.addParam(urlParamDataEncoding, searchReq.DataEncoding)
qb.addParam(urlParamVersion, searchReq.Version)
qb.addParam(urlParamFooterSize, strconv.FormatUint(uint64(searchReq.FooterSize), 10))
if len(searchReq.DedicatedColumns) > 0 {
columnsJSON, err := json.Marshal(searchReq.DedicatedColumns)
if err != nil {
return nil, err
}
qb.addParam(urlParamDedicatedColumns, string(columnsJSON))
if len(dedicatedColumnsJSON) > 0 && dedicatedColumnsJSON != "null" { // if a caller marshals a nil dedicated cols we will receive the string "null"
qb.addParam(urlParamDedicatedColumns, dedicatedColumnsJSON)
}

req.URL.RawQuery = qb.query()
Expand Down
11 changes: 9 additions & 2 deletions pkg/api/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -477,7 +478,10 @@ func TestBuildSearchBlockRequest(t *testing.T) {
}

for _, tc := range tests {
actualURL, err := BuildSearchBlockRequest(tc.httpReq, tc.req)
jsonBytes, err := json.Marshal(tc.req.DedicatedColumns)
require.NoError(t, err)

actualURL, err := BuildSearchBlockRequest(tc.httpReq, tc.req, string(jsonBytes))
assert.NoError(t, err)
assert.Equal(t, tc.query, actualURL.URL.String())
}
Expand Down Expand Up @@ -716,7 +720,10 @@ func TestQueryRangeRoundtrip(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
httpReq := BuildQueryRangeRequest(nil, tc.req)
jsonBytes, err := json.Marshal(tc.req.DedicatedColumns)
require.NoError(t, err)

httpReq := BuildQueryRangeRequest(nil, tc.req, string(jsonBytes))
actualReq, err := ParseQueryRangeRequest(httpReq)
require.NoError(t, err)
assert.Equal(t, tc.req, actualReq)
Expand Down
Loading