Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a startTime and endTime parameter to the trace by id query API to improve query performance #1388

Merged
merged 6 commits into from
Apr 18, 2022
Merged
2 changes: 1 addition & 1 deletion modules/frontend/searchsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type mockReader struct {
metas []*backend.BlockMeta
}

func (m *mockReader) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]*tempopb.Trace, []error, error) {
func (m *mockReader) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string, timeStart int64, timeEnd int64) ([]*tempopb.Trace, []error, error) {
return nil, nil, nil
}

Expand Down
52 changes: 43 additions & 9 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"time"

"github.com/golang/protobuf/jsonpb"
Expand All @@ -22,6 +23,8 @@ const (
BlockStartKey = "blockStart"
BlockEndKey = "blockEnd"
QueryModeKey = "mode"
TimeStartKey = "timeStart"
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
TimeEndKey = "timeEnd"

QueryModeIngesters = "ingesters"
QueryModeBlocks = "blocks"
Expand All @@ -44,7 +47,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
}

// validate request
blockStart, blockEnd, queryMode, err := validateAndSanitizeRequest(r)
blockStart, blockEnd, queryMode, timeStart, timeEnd, err := validateAndSanitizeRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand All @@ -53,14 +56,16 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
ot_log.String("msg", "validated request"),
ot_log.String("blockStart", blockStart),
ot_log.String("blockEnd", blockEnd),
ot_log.String("queryMode", queryMode))
ot_log.String("queryMode", queryMode),
ot_log.String("timeStart", fmt.Sprint(timeStart)),
ot_log.String("timeEnd", fmt.Sprint(timeEnd)))

resp, err := q.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: byteID,
BlockStart: blockStart,
BlockEnd: blockEnd,
QueryMode: queryMode,
})
}, timeStart, timeEnd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -99,36 +104,40 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
}

// return values are (blockStart, blockEnd, queryMode, error)
func validateAndSanitizeRequest(r *http.Request) (string, string, string, error) {
func validateAndSanitizeRequest(r *http.Request) (string, string, string, int64, int64, error) {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
q := r.URL.Query().Get(QueryModeKey)

// validate queryMode. it should either be empty or one of (QueryModeIngesters|QueryModeBlocks|QueryModeAll)
var queryMode string
var startTime int64
var endTime int64
if len(q) == 0 || q == QueryModeAll {
queryMode = QueryModeAll
} else if q == QueryModeIngesters {
queryMode = QueryModeIngesters
} else if q == QueryModeBlocks {
queryMode = QueryModeBlocks
} else {
return "", "", "", fmt.Errorf("invalid value for mode %s", q)
return "", "", "", 0, 0, fmt.Errorf("invalid value for mode %s", q)
}

// no need to validate/sanitize other parameters if queryMode == QueryModeIngesters
if queryMode == QueryModeIngesters {
return "", "", queryMode, nil
return "", "", queryMode, 0, 0, nil
}

start := r.URL.Query().Get(BlockStartKey)
end := r.URL.Query().Get(BlockEndKey)
timeStart := r.URL.Query().Get(TimeStartKey)
timeEnd := r.URL.Query().Get(TimeEndKey)

// validate start. it should either be empty or a valid uuid
if len(start) == 0 {
start = tempodb.BlockIDMin
} else {
_, err := uuid.Parse(start)
if err != nil {
return "", "", "", errors.Wrap(err, "invalid value for blockStart")
return "", "", "", 0, 0, errors.Wrap(err, "invalid value for blockStart")
}
}

Expand All @@ -138,11 +147,36 @@ func validateAndSanitizeRequest(r *http.Request) (string, string, string, error)
} else {
_, err := uuid.Parse(end)
if err != nil {
return "", "", "", errors.Wrap(err, "invalid value for blockEnd")
return "", "", "", 0, 0, errors.Wrap(err, "invalid value for blockEnd")
}
}

return start, end, queryMode, nil
if len(timeStart) == 0 {
startTime = 0
} else {
var err error
startTime, err = strconv.ParseInt(timeStart, 10, 64)
if err != nil {
return "", "", "", 0, 0, errors.Wrap(err, "invalid value for timeStart")
}
}

// validate timeEnd. it should either be empty or a valid time
if len(timeEnd) == 0 {
endTime = 0
} else {
var err error
endTime, err = strconv.ParseInt(timeEnd, 10, 64)
if err != nil {
return "", "", "", 0, 0, errors.Wrap(err, "invalid value for timeEnd")
}
}

if endTime < startTime {
return "", "", "", 0, 0, errors.Wrap(errors.New("endTime can not be less than startTime"), "invalid value for timeEnd")
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
}

return start, end, queryMode, startTime, endTime, nil
}

func (q *Querier) SearchHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
6 changes: 4 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (q *Querier) stopping(_ error) error {
}

// FindTraceByID implements tempopb.Querier.
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest) (*tempopb.TraceByIDResponse, error) {
func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequest, timeStart int64, timeEnd int64) (*tempopb.TraceByIDResponse, error) {
if !validation.ValidTraceID(req.TraceID) {
return nil, fmt.Errorf("invalid trace id")
}
Expand Down Expand Up @@ -217,7 +217,9 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
var failedBlocks int
if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll {
span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, blockErrs, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
span.LogFields(ot_log.String("timeStart", fmt.Sprint(timeStart)))
span.LogFields(ot_log.String("timeEnd", fmt.Sprint(timeEnd)))
partialTraces, blockErrs, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd, timeStart, timeEnd)
if err != nil {
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}
Expand Down
6 changes: 3 additions & 3 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestCompaction(t *testing.T) {

// now see if we can find our ids
for i, id := range allIds {
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax)
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax, 0, 0)
require.NoError(t, err)
require.Nil(t, failedBlocks)
require.NotNil(t, trs)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestSameIDCompaction(t *testing.T) {

// search for all ids
for i, id := range allIds {
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax)
trs, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax, 0, 0)
assert.NoError(t, err)
assert.Nil(t, failedBlocks)

Expand Down Expand Up @@ -373,7 +373,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
// Make sure all expected traces are found.
for i := 0; i < blockCount; i++ {
for j := 0; j < recordCount; j++ {
trace, failedBlocks, err := rw.Find(context.TODO(), testTenantID, makeTraceID(i, j), BlockIDMin, BlockIDMax)
trace, failedBlocks, err := rw.Find(context.TODO(), testTenantID, makeTraceID(i, j), BlockIDMin, BlockIDMax, 0, 0)
require.NotNil(t, trace)
require.Greater(t, len(trace), 0)
require.NoError(t, err)
Expand Down
20 changes: 13 additions & 7 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Writer interface {
type IterateObjectCallback func(id common.ID, obj []byte) bool

type Reader interface {
Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]*tempopb.Trace, []error, error)
Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string, timeStart int64, timeEnd int64) ([]*tempopb.Trace, []error, error)
Search(ctx context.Context, meta *backend.BlockMeta, req *tempopb.SearchRequest, opts common.SearchOptions) (*tempopb.SearchResponse, error)
BlockMetas(tenantID string) []*backend.BlockMeta
EnablePolling(sharder blocklist.JobSharder)
Expand Down Expand Up @@ -285,7 +285,7 @@ func (rw *readerWriter) BlockMetas(tenantID string) []*backend.BlockMeta {
return rw.blocklist.Metas(tenantID)
}

func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]*tempopb.Trace, []error, error) {
func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string, timeStart int64, timeEnd int64) ([]*tempopb.Trace, []error, error) {
// tracing instrumentation
logger := log.WithContext(ctx, log.Logger)
span, ctx := opentracing.StartSpanFromContext(ctx, "store.Find")
Expand Down Expand Up @@ -316,13 +316,13 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
compactedBlocksSearched := 0

for _, b := range blocklist {
if includeBlock(b, id, blockStartBytes, blockEndBytes) {
if includeBlock(b, id, blockStartBytes, blockEndBytes, timeStart, timeEnd) {
copiedBlocklist = append(copiedBlocklist, b)
blocksSearched++
}
}
for _, c := range compactedBlocklist {
if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll) {
if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll, timeStart, timeEnd) {
copiedBlocklist = append(copiedBlocklist, &c.BlockMeta)
compactedBlocksSearched++
}
Expand Down Expand Up @@ -489,11 +489,17 @@ func (rw *readerWriter) getWriterForBlock(meta *backend.BlockMeta, curTime time.
}

// includeBlock indicates whether a given block should be included in a backend search
func includeBlock(b *backend.BlockMeta, id common.ID, blockStart []byte, blockEnd []byte) bool {
func includeBlock(b *backend.BlockMeta, id common.ID, blockStart []byte, blockEnd []byte, timeStart int64, timeEnd int64) bool {
if bytes.Compare(id, b.MinID) == -1 || bytes.Compare(id, b.MaxID) == 1 {
return false
}

if timeStart != 0 && timeEnd != 0 {
if b.StartTime.Unix() >= timeEnd || b.EndTime.Unix() <= timeStart {
return false
}
}

blockIDBytes, _ := b.BlockID.MarshalBinary()
// check block is in shard boundaries
// blockStartBytes <= blockIDBytes <= blockEndBytes
Expand All @@ -505,10 +511,10 @@ func includeBlock(b *backend.BlockMeta, id common.ID, blockStart []byte, blockEn
}

// if block is compacted within lookback period, and is within shard ranges, include it in search
func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart []byte, blockEnd []byte, poll time.Duration) bool {
func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart []byte, blockEnd []byte, poll time.Duration, timeStart int64, timeEnd int64) bool {
lookback := time.Now().Add(-(2 * poll))
if c.CompactedTime.Before(lookback) {
return false
}
return includeBlock(&c.BlockMeta, id, blockStart, blockEnd)
return includeBlock(&c.BlockMeta, id, blockStart, blockEnd, timeStart, timeEnd)
}
Loading