From 654efd32fb327489cde550047f931ce51ef11e93 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Thu, 21 Oct 2021 15:24:49 +0200 Subject: [PATCH 01/20] Search: drop use of TagCache, extract tags and tag values on-demand --- modules/ingester/flush.go | 3 - modules/ingester/ingester_search.go | 4 +- modules/ingester/instance.go | 6 -- modules/ingester/instance_search.go | 118 +++++++++++++++++++++-- tempodb/search/backend_search_block.go | 38 ++++++++ tempodb/search/searchable_block.go | 2 + tempodb/search/streaming_search_block.go | 16 +++ 7 files changed, 166 insertions(+), 21 deletions(-) diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index b7cddaa5847..1530ec240b8 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -179,9 +179,6 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { if err != nil { level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err) } - - // periodically purge tag cache, keep tags within complete block timeout (i.e. data that is locally) - instance.PurgeExpiredSearchTags(time.Now().Add(-i.cfg.CompleteBlockTimeout)) } func (i *Ingester) flushLoop(j int) { diff --git a/modules/ingester/ingester_search.go b/modules/ingester/ingester_search.go index 1a7e057ce0d..f1c6806c40b 100644 --- a/modules/ingester/ingester_search.go +++ b/modules/ingester/ingester_search.go @@ -37,7 +37,7 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques return &tempopb.SearchTagsResponse{}, nil } - tags := inst.GetSearchTags() + tags := inst.GetSearchTags(ctx) resp := &tempopb.SearchTagsResponse{ TagNames: tags, @@ -56,7 +56,7 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa return &tempopb.SearchTagValuesResponse{}, nil } - vals := inst.GetSearchTagValues(req.TagName) + vals := inst.GetSearchTagValues(ctx, req.TagName) resp := &tempopb.SearchTagValuesResponse{ TagValues: vals, diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index e71d6f6c790..f6f60a27884 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -75,7 +75,6 @@ type instance struct { searchHeadBlock *searchStreamingBlockEntry searchAppendBlocks map[*wal.AppendBlock]*searchStreamingBlockEntry searchCompleteBlocks map[*wal.LocalBlock]*searchLocalBlockEntry - searchTagCache *search.TagCache lastBlockCut time.Time @@ -107,7 +106,6 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l * traces: map[uint32]*trace{}, searchAppendBlocks: map[*wal.AppendBlock]*searchStreamingBlockEntry{}, searchCompleteBlocks: map[*wal.LocalBlock]*searchLocalBlockEntry{}, - searchTagCache: search.NewTagCache(), instanceID: instanceID, tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID), @@ -176,10 +174,6 @@ func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte, return status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err) } - if searchData != nil { - i.RecordSearchLookupValues(searchData) - } - i.tracesMtx.Lock() defer i.tracesMtx.Unlock() diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 9d6819db51a..356cc9ec2bf 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -3,7 +3,6 @@ package ingester import ( "context" "sort" - "time" cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" @@ -181,19 +180,118 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr } } -func (i *instance) GetSearchTags() []string { - return i.searchTagCache.GetNames() +func (i *instance) GetSearchTags(ctx context.Context) []string { + tags := map[string]struct{}{} + + i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { + kv := &tempofb.KeyValues{} + + for i, ii := 0, entry.TagsLength(); i < ii; i++ { + entry.Tags(kv, i) + tags[string(kv.Key())] = struct{}{} + } + }) + + extractTagsFromSearchableBlock := func(block search.SearchableBlock) { + err := block.Tags(ctx, tags) + if err != nil { + // TODO + panic(err) + } + } + i.blocksMtx.RLock() + i.visitSearchableBlocksWAL(ctx, extractTagsFromSearchableBlock) + i.visitSearchableBlocksLocalBlocks(ctx, extractTagsFromSearchableBlock) + i.blocksMtx.RUnlock() + + tagsSlice := make([]string, 0, len(tags)) + for tag := range tags { + tagsSlice = append(tagsSlice, tag) + } + + return tagsSlice +} + +func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) []string { + values := map[string]struct{}{} + + i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { + kv := &tempofb.KeyValues{} + + for i, tagsLength := 0, entry.TagsLength(); i < tagsLength; i++ { + entry.Tags(kv, i) + + if string(kv.Key()) == tagName { + for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { + values[string(kv.Value(j))] = struct{}{} + } + break + } + } + }) + + extractTagValuesFromSearchableBlocks := func(block search.SearchableBlock) { + err := block.TagValues(ctx, tagName, values) + if err != nil { + // TODO + panic(err) + } + } + i.blocksMtx.RLock() + i.visitSearchableBlocksWAL(ctx, extractTagValuesFromSearchableBlocks) + i.visitSearchableBlocksLocalBlocks(ctx, extractTagValuesFromSearchableBlocks) + i.blocksMtx.RUnlock() + + valuesSlice := make([]string, 0, len(values)) + for tag := range values { + valuesSlice = append(valuesSlice, tag) + } + + return valuesSlice } -func (i *instance) GetSearchTagValues(tagName string) []string { - return i.searchTagCache.GetValues(tagName) +func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visit func(entry *tempofb.SearchEntry)) { + span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchEntriesLiveTraces") + defer span.Finish() + + i.tracesMtx.Lock() + defer i.tracesMtx.Unlock() + + for _, t := range i.traces { + for _, s := range t.searchData { + visit(tempofb.SearchEntryFromBytes(s)) + } + } } -func (i *instance) RecordSearchLookupValues(b []byte) { - s := tempofb.SearchEntryFromBytes(b) - i.searchTagCache.SetData(time.Now(), s) +// visitSearchableBlocksWAL visits every WAL block. Must be called under lock. +func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visit func(block search.SearchableBlock)) { + span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL") + defer span.Finish() + + visitUnderLock := func(entry *searchStreamingBlockEntry) { + entry.mtx.RLock() + defer entry.mtx.RUnlock() + + visit(entry.b) + } + + visitUnderLock(i.searchHeadBlock) + for _, b := range i.searchAppendBlocks { + visitUnderLock(b) + } + } -func (i *instance) PurgeExpiredSearchTags(before time.Time) { - i.searchTagCache.PurgeExpired(before) +// visitSearchableBlocksWAL visits every local block. Must be called under lock. +func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit func(block search.SearchableBlock)) { + span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks") + defer span.Finish() + + for _, b := range i.searchCompleteBlocks { + b.mtx.RLock() + defer b.mtx.RUnlock() + + visit(b.b) + } } diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index fbd4230bd34..2ce66e7a4c1 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -140,6 +140,44 @@ func (s *BackendSearchBlock) BlockID() uuid.UUID { return s.id } +func (s *BackendSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { + hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) + if err != nil { + return err + } + header := tempofb.GetRootAsSearchBlockHeader(hb, 0) + + kv := &tempofb.KeyValues{} + for i, ii := 0, header.TagsLength(); i < ii; i++ { + header.Tags(kv, i) + tags[string(kv.Key())] = struct{}{} + } + + return nil +} + +func (s *BackendSearchBlock) TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error { + hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) + if err != nil { + return err + } + header := tempofb.GetRootAsSearchBlockHeader(hb, 0) + + kv := &tempofb.KeyValues{} + for i, tagsLength := 0, header.TagsLength(); i < tagsLength; i++ { + header.Tags(kv, i) + + if string(kv.Key()) == tag { + for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { + tagValues[string(kv.Value(j))] = struct{}{} + } + break + } + } + + return nil +} + // Search iterates through the block looking for matches. func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { var pageBuf []byte diff --git a/tempodb/search/searchable_block.go b/tempodb/search/searchable_block.go index f8aedbca0cd..ea04bb69bab 100644 --- a/tempodb/search/searchable_block.go +++ b/tempodb/search/searchable_block.go @@ -5,6 +5,8 @@ import ( ) type SearchableBlock interface { + Tags(ctx context.Context, tags map[string]struct{}) error + TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error Search(ctx context.Context, p Pipeline, sr *Results) error } diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 4060e0c6522..5c1768799ec 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -85,6 +85,22 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD return s.appender.Append(id, combined) } +func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { + for k, _ := range s.header.Tags { + tags[k] = struct{}{} + } + return nil +} + +func (s *StreamingSearchBlock) TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error { + if values, ok := s.header.Tags[tag]; ok { + for _, v := range values { + tagValues[v] = struct{}{} + } + } + return nil +} + // Search the streaming block. func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { if s.closed.Load() { From f4faba82e87b7686ae92dbe588394dcd65b752f6 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Thu, 21 Oct 2021 16:29:31 +0200 Subject: [PATCH 02/20] Fix compile-error in tests --- modules/ingester/instance_search_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 499e7f5014e..65dc10f41b1 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -246,11 +246,11 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { }) go concurrent(func() { - i.GetSearchTags() + i.GetSearchTags(context.Background()) }) go concurrent(func() { - i.GetSearchTagValues(tagKey) + i.GetSearchTagValues(context.Background(), tagKey) }) time.Sleep(2000 * time.Millisecond) From e8b5d5f5a1aaeb161d2be577d3c77330f3479dd3 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Thu, 21 Oct 2021 16:32:57 +0200 Subject: [PATCH 03/20] fmt --- tempodb/search/streaming_search_block.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 5c1768799ec..058c45b0405 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -86,7 +86,7 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD } func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { - for k, _ := range s.header.Tags { + for k := range s.header.Tags { tags[k] = struct{}{} } return nil From b02db6c91cc5550a03eeff27bb772f8b4e8c8aa6 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Thu, 21 Oct 2021 18:20:01 +0200 Subject: [PATCH 04/20] Remove TagCache :cat-salute: --- tempodb/search/tag_cache.go | 120 ------------------------------- tempodb/search/tag_cache_test.go | 55 -------------- 2 files changed, 175 deletions(-) delete mode 100644 tempodb/search/tag_cache.go delete mode 100644 tempodb/search/tag_cache_test.go diff --git a/tempodb/search/tag_cache.go b/tempodb/search/tag_cache.go deleted file mode 100644 index 423204c8369..00000000000 --- a/tempodb/search/tag_cache.go +++ /dev/null @@ -1,120 +0,0 @@ -package search - -import ( - "math" - "sort" - "sync" - "time" - - "github.com/grafana/tempo/pkg/tempofb" -) - -type CacheEntry struct { - values map[string]int64 // value -> unix timestamp -} - -const maxValuesPerTag = 50 - -type TagCache struct { - lookups map[string]*CacheEntry - mtx sync.RWMutex -} - -func NewTagCache() *TagCache { - return &TagCache{ - lookups: map[string]*CacheEntry{}, - } -} - -func (s *TagCache) GetNames() []string { - s.mtx.RLock() - tags := make([]string, 0, len(s.lookups)) - for k := range s.lookups { - tags = append(tags, k) - } - s.mtx.RUnlock() - - sort.Strings(tags) - return tags -} - -func (s *TagCache) GetValues(tagName string) []string { - var vals []string - - s.mtx.RLock() - if e := s.lookups[tagName]; e != nil { - vals = make([]string, 0, len(e.values)) - for v := range e.values { - vals = append(vals, v) - } - } - s.mtx.RUnlock() - - sort.Strings(vals) - return vals -} - -func (s *TagCache) SetData(ts time.Time, data *tempofb.SearchEntry) { - s.mtx.Lock() - defer s.mtx.Unlock() - - tsUnix := ts.Unix() - kv := &tempofb.KeyValues{} - - l := data.TagsLength() - for j := 0; j < l; j++ { - data.Tags(kv, j) - key := string(kv.Key()) - l2 := kv.ValueLength() - for k := 0; k < l2; k++ { - s.setEntry(tsUnix, key, string(kv.Value(k))) - } - } -} - -// setEntry should be called under lock. -func (s *TagCache) setEntry(ts int64, k, v string) { - e := s.lookups[k] - if e == nil { - // First entry - s.lookups[k] = &CacheEntry{values: map[string]int64{v: ts}} - return - } - - // Prune oldest as needed - for len(e.values) >= maxValuesPerTag { - earliestv := "" - earliestts := int64(math.MaxInt64) - - for v, ts := range e.values { - if ts < earliestts { - earliestv = v - earliestts = ts - } - } - - delete(e.values, earliestv) - } - - e.values[v] = ts -} - -func (s *TagCache) PurgeExpired(before time.Time) { - s.mtx.Lock() - defer s.mtx.Unlock() - - beforeUnix := before.Unix() - - for k, e := range s.lookups { - for v, ts := range e.values { - if ts < beforeUnix { - delete(e.values, v) - } - } - - // Remove tags when all values deleted - if len(e.values) <= 0 { - delete(s.lookups, k) - } - } -} diff --git a/tempodb/search/tag_cache_test.go b/tempodb/search/tag_cache_test.go deleted file mode 100644 index 913a5e6cdca..00000000000 --- a/tempodb/search/tag_cache_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package search - -import ( - "fmt" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestSearchTagCacheGetNames(t *testing.T) { - c := NewTagCache() - c.setEntry(0, "k1", "v1") - c.setEntry(0, "k1", "v2") - require.Equal(t, []string{"k1"}, c.GetNames()) -} - -func TestSearchTagCacheMaxValuesPerTag(t *testing.T) { - c := NewTagCache() - - for i := 0; i < maxValuesPerTag+1; i++ { - c.setEntry(int64(i), "k", fmt.Sprintf("v%02d", i)) - } - - vals := c.GetValues("k") - - require.Len(t, vals, maxValuesPerTag) - require.Equal(t, "v01", vals[0]) // oldest v0 was evicted - require.Equal(t, fmt.Sprintf("v%02d", maxValuesPerTag), vals[maxValuesPerTag-1]) -} - -func TestSearchTagCachePurge(t *testing.T) { - c := NewTagCache() - - oneMinuteAgo := time.Now().Add(-1 * time.Minute) - twoMinutesAgo := time.Now().Add(-2 * time.Minute) - - c.setEntry(twoMinutesAgo.Unix(), "j", "a") - c.setEntry(twoMinutesAgo.Unix(), "k", "a") - c.setEntry(oneMinuteAgo.Unix(), "k", "b") - - c.PurgeExpired(oneMinuteAgo) - - require.Equal(t, []string{"k"}, c.GetNames()) // Empty tags purged - require.Equal(t, []string{"b"}, c.GetValues("k")) // Old values purged -} - -func BenchmarkSearchTagCacheSetEntry(b *testing.B) { - c := NewTagCache() - - for i := 0; i < b.N; i++ { - c.setEntry(int64(i), "k", strconv.Itoa(b.N)) - } -} From 8a177edb9afa648a66bbd81a055e9ab91106d765 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Thu, 21 Oct 2021 18:48:59 +0200 Subject: [PATCH 05/20] Add error handling --- modules/ingester/ingester_search.go | 22 ++++---- modules/ingester/instance_search.go | 88 +++++++++++++++++++---------- 2 files changed, 69 insertions(+), 41 deletions(-) diff --git a/modules/ingester/ingester_search.go b/modules/ingester/ingester_search.go index f1c6806c40b..c1caf43a56e 100644 --- a/modules/ingester/ingester_search.go +++ b/modules/ingester/ingester_search.go @@ -37,13 +37,14 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques return &tempopb.SearchTagsResponse{}, nil } - tags := inst.GetSearchTags(ctx) - - resp := &tempopb.SearchTagsResponse{ - TagNames: tags, + tags, err := inst.GetSearchTags(ctx) + if err != nil { + return nil, err } - return resp, nil + return &tempopb.SearchTagsResponse{ + TagNames: tags, + }, nil } func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) { @@ -56,11 +57,12 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa return &tempopb.SearchTagValuesResponse{}, nil } - vals := inst.GetSearchTagValues(ctx, req.TagName) - - resp := &tempopb.SearchTagValuesResponse{ - TagValues: vals, + vals, err := inst.GetSearchTagValues(ctx, req.TagName) + if err != nil { + return nil, err } - return resp, nil + return &tempopb.SearchTagValuesResponse{ + TagValues: vals, + }, nil } diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 356cc9ec2bf..2eace9b5539 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -180,7 +180,7 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr } } -func (i *instance) GetSearchTags(ctx context.Context) []string { +func (i *instance) GetSearchTags(ctx context.Context) ([]string, error) { tags := map[string]struct{}{} i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { @@ -192,27 +192,32 @@ func (i *instance) GetSearchTags(ctx context.Context) []string { } }) - extractTagsFromSearchableBlock := func(block search.SearchableBlock) { - err := block.Tags(ctx, tags) + extractTagsFromSearchableBlock := func(block search.SearchableBlock) error { + return block.Tags(ctx, tags) + } + err := func() error { + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() + + err := i.visitSearchableBlocksWAL(ctx, extractTagsFromSearchableBlock) if err != nil { - // TODO - panic(err) + return err } + return i.visitSearchableBlocksLocalBlocks(ctx, extractTagsFromSearchableBlock) + }() + if err != nil { + return nil, err } - i.blocksMtx.RLock() - i.visitSearchableBlocksWAL(ctx, extractTagsFromSearchableBlock) - i.visitSearchableBlocksLocalBlocks(ctx, extractTagsFromSearchableBlock) - i.blocksMtx.RUnlock() tagsSlice := make([]string, 0, len(tags)) for tag := range tags { tagsSlice = append(tagsSlice, tag) } - return tagsSlice + return tagsSlice, nil } -func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) []string { +func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) ([]string, error) { values := map[string]struct{}{} i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { @@ -230,24 +235,30 @@ func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) []str } }) - extractTagValuesFromSearchableBlocks := func(block search.SearchableBlock) { - err := block.TagValues(ctx, tagName, values) + extractTagValuesFromSearchableBlocks := func(block search.SearchableBlock) error { + return block.TagValues(ctx, tagName, values) + } + + err := func() error { + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() + + err := i.visitSearchableBlocksWAL(ctx, extractTagValuesFromSearchableBlocks) if err != nil { - // TODO - panic(err) + return err } + return i.visitSearchableBlocksLocalBlocks(ctx, extractTagValuesFromSearchableBlocks) + }() + if err != nil { + return nil, err } - i.blocksMtx.RLock() - i.visitSearchableBlocksWAL(ctx, extractTagValuesFromSearchableBlocks) - i.visitSearchableBlocksLocalBlocks(ctx, extractTagValuesFromSearchableBlocks) - i.blocksMtx.RUnlock() valuesSlice := make([]string, 0, len(values)) for tag := range values { valuesSlice = append(valuesSlice, tag) } - return valuesSlice + return valuesSlice, nil } func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visit func(entry *tempofb.SearchEntry)) { @@ -265,33 +276,48 @@ func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visit func( } // visitSearchableBlocksWAL visits every WAL block. Must be called under lock. -func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visit func(block search.SearchableBlock)) { +func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visit func(block search.SearchableBlock) error) error { span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL") defer span.Finish() - visitUnderLock := func(entry *searchStreamingBlockEntry) { + visitUnderLock := func(entry *searchStreamingBlockEntry) error { entry.mtx.RLock() defer entry.mtx.RUnlock() - visit(entry.b) + return visit(entry.b) } - visitUnderLock(i.searchHeadBlock) - for _, b := range i.searchAppendBlocks { - visitUnderLock(b) + err := visitUnderLock(i.searchHeadBlock) + if err != nil { + return err } + for _, b := range i.searchAppendBlocks { + err := visitUnderLock(b) + if err != nil { + return err + } + } + return nil } // visitSearchableBlocksWAL visits every local block. Must be called under lock. -func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit func(block search.SearchableBlock)) { +func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit func(block search.SearchableBlock) error) error { span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks") defer span.Finish() - for _, b := range i.searchCompleteBlocks { - b.mtx.RLock() - defer b.mtx.RUnlock() + visitUnderLock := func(entry *searchLocalBlockEntry) error { + entry.mtx.RLock() + defer entry.mtx.RUnlock() + + return visit(entry.b) + } - visit(b.b) + for _, b := range i.searchCompleteBlocks { + err := visitUnderLock(b) + if err != nil { + return err + } } + return nil } From 30c453eb5d75cf30a41f198b7217b24beaf8a915 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Thu, 21 Oct 2021 18:57:24 +0200 Subject: [PATCH 06/20] lint --- modules/ingester/instance_search_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 65dc10f41b1..83b1713a2ac 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -246,11 +246,13 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { }) go concurrent(func() { - i.GetSearchTags(context.Background()) + _, err := i.GetSearchTags(context.Background()) + require.NoError(t, err, "error getting search tags") }) go concurrent(func() { - i.GetSearchTagValues(context.Background(), tagKey) + _, err := i.GetSearchTagValues(context.Background(), tagKey) + require.NoError(t, err, "error getting search tag values") }) time.Sleep(2000 * time.Millisecond) From ec96dc01b0e939a5f8763a1c48f982c434a00837 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Sun, 24 Oct 2021 03:14:34 +0200 Subject: [PATCH 07/20] Clean up and optimisations --- modules/ingester/instance_search.go | 118 +++++++++++++---------- tempodb/search/backend_search_block.go | 29 ++++-- tempodb/search/searchable_block.go | 2 +- tempodb/search/streaming_search_block.go | 13 ++- 4 files changed, 97 insertions(+), 65 deletions(-) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 2eace9b5539..e512a7808a2 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -1,6 +1,7 @@ package ingester import ( + "bytes" "context" "sort" @@ -183,85 +184,64 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr func (i *instance) GetSearchTags(ctx context.Context) ([]string, error) { tags := map[string]struct{}{} - i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { - kv := &tempofb.KeyValues{} - + kv := &tempofb.KeyValues{} + err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { for i, ii := 0, entry.TagsLength(); i < ii; i++ { entry.Tags(kv, i) - tags[string(kv.Key())] = struct{}{} + if _, ok := tags[string(kv.Key())]; !ok { + tags[string(kv.Key())] = struct{}{} + } } }) - - extractTagsFromSearchableBlock := func(block search.SearchableBlock) error { - return block.Tags(ctx, tags) - } - err := func() error { - i.blocksMtx.RLock() - defer i.blocksMtx.RUnlock() - - err := i.visitSearchableBlocksWAL(ctx, extractTagsFromSearchableBlock) - if err != nil { - return err - } - return i.visitSearchableBlocksLocalBlocks(ctx, extractTagsFromSearchableBlock) - }() if err != nil { return nil, err } - tagsSlice := make([]string, 0, len(tags)) - for tag := range tags { - tagsSlice = append(tagsSlice, tag) + err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error { + return block.Tags(ctx, tags) + }) + if err != nil { + return nil, err } - return tagsSlice, nil + return extractKeys(tags), nil } func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) ([]string, error) { values := map[string]struct{}{} - i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { - kv := &tempofb.KeyValues{} - + kv := &tempofb.KeyValues{} + tagNameBytes := []byte(tagName) + err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { for i, tagsLength := 0, entry.TagsLength(); i < tagsLength; i++ { entry.Tags(kv, i) - if string(kv.Key()) == tagName { + // TODO use binary search? + if bytes.Equal(kv.Key(), tagNameBytes) { for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { - values[string(kv.Value(j))] = struct{}{} + if _, ok := values[string(kv.Value(j))]; !ok { + values[string(kv.Value(j))] = struct{}{} + } } break } } }) - - extractTagValuesFromSearchableBlocks := func(block search.SearchableBlock) error { - return block.TagValues(ctx, tagName, values) - } - - err := func() error { - i.blocksMtx.RLock() - defer i.blocksMtx.RUnlock() - - err := i.visitSearchableBlocksWAL(ctx, extractTagValuesFromSearchableBlocks) - if err != nil { - return err - } - return i.visitSearchableBlocksLocalBlocks(ctx, extractTagValuesFromSearchableBlocks) - }() if err != nil { return nil, err } - valuesSlice := make([]string, 0, len(values)) - for tag := range values { - valuesSlice = append(valuesSlice, tag) + err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error { + return block.TagValues(ctx, tagName, values) + }) + if err != nil { + return nil, err } - return valuesSlice, nil + return extractKeys(values), nil } -func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visit func(entry *tempofb.SearchEntry)) { +func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visitFn func(entry *tempofb.SearchEntry)) error { span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchEntriesLiveTraces") defer span.Finish() @@ -270,13 +250,30 @@ func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visit func( for _, t := range i.traces { for _, s := range t.searchData { - visit(tempofb.SearchEntryFromBytes(s)) + visitFn(tempofb.SearchEntryFromBytes(s)) + + if err := ctx.Err(); err != nil { + return err + } } } + return nil +} + +func (i *instance) visitSearchableBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error { + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() + + err := i.visitSearchableBlocksWAL(ctx, visitFn) + if err != nil { + return err + } + + return i.visitSearchableBlocksLocalBlocks(ctx, visitFn) } // visitSearchableBlocksWAL visits every WAL block. Must be called under lock. -func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visit func(block search.SearchableBlock) error) error { +func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visitFn func(block search.SearchableBlock) error) error { span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL") defer span.Finish() @@ -284,25 +281,31 @@ func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visit func(bloc entry.mtx.RLock() defer entry.mtx.RUnlock() - return visit(entry.b) + return visitFn(entry.b) } err := visitUnderLock(i.searchHeadBlock) if err != nil { return err } + if err := ctx.Err(); err != nil { + return err + } for _, b := range i.searchAppendBlocks { err := visitUnderLock(b) if err != nil { return err } + if err := ctx.Err(); err != nil { + return err + } } return nil } // visitSearchableBlocksWAL visits every local block. Must be called under lock. -func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit func(block search.SearchableBlock) error) error { +func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error { span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks") defer span.Finish() @@ -310,7 +313,7 @@ func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit f entry.mtx.RLock() defer entry.mtx.RUnlock() - return visit(entry.b) + return visitFn(entry.b) } for _, b := range i.searchCompleteBlocks { @@ -318,6 +321,17 @@ func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visit f if err != nil { return err } + if err := ctx.Err(); err != nil { + return err + } } return nil } + +func extractKeys(set map[string]struct{}) []string { + keys := make([]string, 0, len(set)) + for k := range set { + keys = append(keys, k) + } + return keys +} diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index 2ce66e7a4c1..95bf7d28545 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -1,6 +1,7 @@ package search import ( + "bytes" "context" "io" @@ -141,35 +142,39 @@ func (s *BackendSearchBlock) BlockID() uuid.UUID { } func (s *BackendSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { - hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) + header, err := s.readSearchHeader(ctx) if err != nil { return err } - header := tempofb.GetRootAsSearchBlockHeader(hb, 0) kv := &tempofb.KeyValues{} for i, ii := 0, header.TagsLength(); i < ii; i++ { header.Tags(kv, i) - tags[string(kv.Key())] = struct{}{} + if _, ok := tags[string(kv.Key())]; !ok { + tags[string(kv.Key())] = struct{}{} + } } return nil } -func (s *BackendSearchBlock) TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error { - hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) +func (s *BackendSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { + header, err := s.readSearchHeader(ctx) if err != nil { return err } - header := tempofb.GetRootAsSearchBlockHeader(hb, 0) kv := &tempofb.KeyValues{} + tagNameBytes := []byte(tagName) for i, tagsLength := 0, header.TagsLength(); i < tagsLength; i++ { header.Tags(kv, i) - if string(kv.Key()) == tag { + // TODO use binary search? + if bytes.Equal(kv.Key(), tagNameBytes) { for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { - tagValues[string(kv.Value(j))] = struct{}{} + if _, ok := tagValues[string(kv.Value(j))]; !ok { + tagValues[string(kv.Value(j))] = struct{}{} + } } break } @@ -294,3 +299,11 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results return nil } + +func (s *BackendSearchBlock) readSearchHeader(ctx context.Context) (*tempofb.SearchBlockHeader, error) { + hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) + if err != nil { + return nil, err + } + return tempofb.GetRootAsSearchBlockHeader(hb, 0), nil +} diff --git a/tempodb/search/searchable_block.go b/tempodb/search/searchable_block.go index ea04bb69bab..110e3426bfe 100644 --- a/tempodb/search/searchable_block.go +++ b/tempodb/search/searchable_block.go @@ -6,7 +6,7 @@ import ( type SearchableBlock interface { Tags(ctx context.Context, tags map[string]struct{}) error - TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error + TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error Search(ctx context.Context, p Pipeline, sr *Results) error } diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 058c45b0405..bdffca4d6ea 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -87,15 +87,20 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { for k := range s.header.Tags { - tags[k] = struct{}{} + if _, ok := tags[k]; !ok { + tags[k] = struct{}{} + } } return nil } -func (s *StreamingSearchBlock) TagValues(ctx context.Context, tag string, tagValues map[string]struct{}) error { - if values, ok := s.header.Tags[tag]; ok { +func (s *StreamingSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { + if values, ok := s.header.Tags[tagName]; ok { for _, v := range values { - tagValues[v] = struct{}{} + // TODO use binary search? + if _, ok := tagValues[v]; !ok { + tagValues[v] = struct{}{} + } } } return nil From 5f2d55351f9cec69db8874e5f94b9a56d4fbac9f Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Tue, 26 Oct 2021 17:37:43 +0200 Subject: [PATCH 08/20] Fix compilation errors --- tempodb/search/streaming_search_block.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 80971f5ddac..84363cdac17 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -90,23 +90,24 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD } func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { - for k := range s.header.Tags { + s.header.Tags.Range(func(k, v string) { if _, ok := tags[k]; !ok { tags[k] = struct{}{} } - } + }) return nil } func (s *StreamingSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { - if values, ok := s.header.Tags[tagName]; ok { - for _, v := range values { - // TODO use binary search? + // TODO optimize to grab all values of tagName + s.header.Tags.Range(func(k, v string) { + if tagName == k { if _, ok := tagValues[v]; !ok { tagValues[v] = struct{}{} } + } - } + }) return nil } From f4b62c6e862d8cce09525bd00527b26fda94af80 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Tue, 26 Oct 2021 19:50:59 +0200 Subject: [PATCH 09/20] Refactor methods for consistency with instance.Search --- modules/ingester/ingester_search.go | 12 ++++-------- modules/ingester/instance_search.go | 18 ++++++++++++------ modules/ingester/instance_search_test.go | 4 ++-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/modules/ingester/ingester_search.go b/modules/ingester/ingester_search.go index c1caf43a56e..da356d781f2 100644 --- a/modules/ingester/ingester_search.go +++ b/modules/ingester/ingester_search.go @@ -37,14 +37,12 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques return &tempopb.SearchTagsResponse{}, nil } - tags, err := inst.GetSearchTags(ctx) + res, err := inst.SearchTags(ctx) if err != nil { return nil, err } - return &tempopb.SearchTagsResponse{ - TagNames: tags, - }, nil + return res, nil } func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) { @@ -57,12 +55,10 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa return &tempopb.SearchTagValuesResponse{}, nil } - vals, err := inst.GetSearchTagValues(ctx, req.TagName) + res, err := inst.SearchTagValues(ctx, req.TagName) if err != nil { return nil, err } - return &tempopb.SearchTagValuesResponse{ - TagValues: vals, - }, nil + return res, nil } diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index e512a7808a2..43999029924 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -181,13 +181,14 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr } } -func (i *instance) GetSearchTags(ctx context.Context) ([]string, error) { +func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, error) { tags := map[string]struct{}{} kv := &tempofb.KeyValues{} err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { for i, ii := 0, entry.TagsLength(); i < ii; i++ { entry.Tags(kv, i) + // check the tag is already set, this is more performant with repetitive values if _, ok := tags[string(kv.Key())]; !ok { tags[string(kv.Key())] = struct{}{} } @@ -204,21 +205,24 @@ func (i *instance) GetSearchTags(ctx context.Context) ([]string, error) { return nil, err } - return extractKeys(tags), nil + return &tempopb.SearchTagsResponse{ + TagNames: extractKeys(tags), + }, nil } -func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) ([]string, error) { +func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) { values := map[string]struct{}{} kv := &tempofb.KeyValues{} tagNameBytes := []byte(tagName) err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { + // TODO use binary search here for i, tagsLength := 0, entry.TagsLength(); i < tagsLength; i++ { entry.Tags(kv, i) - - // TODO use binary search? if bytes.Equal(kv.Key(), tagNameBytes) { + for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { + // check the value is already set, this is more performant with repetitive values if _, ok := values[string(kv.Value(j))]; !ok { values[string(kv.Value(j))] = struct{}{} } @@ -238,7 +242,9 @@ func (i *instance) GetSearchTagValues(ctx context.Context, tagName string) ([]st return nil, err } - return extractKeys(values), nil + return &tempopb.SearchTagValuesResponse{ + TagValues: extractKeys(values), + }, nil } func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visitFn func(entry *tempofb.SearchEntry)) error { diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 800e8630f42..e5423974031 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -247,12 +247,12 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { }) go concurrent(func() { - _, err := i.GetSearchTags(context.Background()) + _, err := i.SearchTags(context.Background()) require.NoError(t, err, "error getting search tags") }) go concurrent(func() { - _, err := i.GetSearchTagValues(context.Background(), tagKey) + _, err := i.SearchTagValues(context.Background(), tagKey) require.NoError(t, err, "error getting search tag values") }) From d16fef70bbe55fa5a9ede975b4e3b2f4c506f6a6 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Mon, 25 Oct 2021 15:46:57 -0400 Subject: [PATCH 10/20] Tweak tag lookup binary search to eliminate last comparison and fetch of data from flatbuffers. Add new FindTag function --- pkg/tempofb/searchdata_test.go | 31 +++++++++++++++++ pkg/tempofb/searchdata_util.go | 61 +++++++++++++++++++++++----------- 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/pkg/tempofb/searchdata_test.go b/pkg/tempofb/searchdata_test.go index e49a23a3e77..2b03ab13d44 100644 --- a/pkg/tempofb/searchdata_test.go +++ b/pkg/tempofb/searchdata_test.go @@ -91,3 +91,34 @@ func TestEncodingSize(t *testing.T) { fmt.Printf(" - Tag: %.1f bytes after\n", float32(tagValueLongTermTags-tagValueBaseLine)/float32(delta)) fmt.Printf(" - Value: %.1f bytes after\n", float32(tagValueLongTermValues-tagValueBaseLine)/float32(delta)) } + +func TestContainsTag(t *testing.T) { + m := &SearchEntryMutable{} + m.AddTag("key1", "value") + m.AddTag("key2", "value") + m.AddTag("key3", "value") + m.AddTag("key4", "value") + m.AddTag("key5", "value") + m.AddTag("key6", "value") + + e := SearchEntryFromBytes(m.ToBytes()) + + kv := &KeyValues{} + + testCases := []struct { + key, value string + found bool + }{ + {"key1", "value", true}, + {"key1", "value2", false}, + {"key6", "value", true}, + {"key0", "value", false}, + {"key10", "value", false}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprint(tc.key, "=", tc.value), func(t *testing.T) { + require.Equal(t, tc.found, ContainsTag(e, kv, []byte(tc.key), []byte(tc.value))) + }) + } +} diff --git a/pkg/tempofb/searchdata_util.go b/pkg/tempofb/searchdata_util.go index af242586c1e..c37da7ebe99 100644 --- a/pkg/tempofb/searchdata_util.go +++ b/pkg/tempofb/searchdata_util.go @@ -157,26 +157,8 @@ type FBTagContainer interface { func ContainsTag(s FBTagContainer, kv *KeyValues, k []byte, v []byte) bool { - matched := -1 - - // Binary search for keys. Flatbuffers are written backwards so - // keys are descending (the comparison is reversed). - // TODO - We only want exact matches, sort.Search has to make an - // extra comparison. We should fork it to make use of the full - // tri-state response from bytes.Compare - sort.Search(s.TagsLength(), func(i int) bool { - s.Tags(kv, i) - comparison := bytes.Compare(k, kv.Key()) - if comparison == 0 { - matched = i - // TODO it'd be great to exit here and retain the data in kv buffer - } - return comparison >= 0 - }) - - if matched >= 0 { - s.Tags(kv, matched) - + kv = FindTag(s, kv, k) + if kv != nil { // Linear search for matching values l := kv.ValueLength() for j := 0; j < l; j++ { @@ -188,3 +170,42 @@ func ContainsTag(s FBTagContainer, kv *KeyValues, k []byte, v []byte) bool { return false } + +func FindTag(s FBTagContainer, kv *KeyValues, k []byte) *KeyValues { + + idx := binarySearch(s.TagsLength(), func(i int) int { + s.Tags(kv, i) + // Note comparison here is backwards because KeyValues are written to flatbuffers in reverse order. + return bytes.Compare(kv.Key(), k) + }) + + if idx >= 0 { + // Data is left in buffer when matched + return kv + } + + return nil +} + +// binarySearch that finds exact matching entry. Returns non-zero index when found, or -1 when not found +// Inspired by sort.Search but makes uses of tri-state comparator to eliminate the last comparison when +// we want to find exact match, not insertion point. +func binarySearch(n int, compare func(int) int) int { + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) // avoid overflow when computing h + // i ≤ h < j + switch compare(h) { + case 0: + // Found exact match + return h + case -1: + j = h + case 1: + i = h + 1 + } + } + + // No match + return -1 +} From ba8966621ec7c1f3f3b843befd3a1c48a9410871 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Tue, 26 Oct 2021 20:16:04 +0200 Subject: [PATCH 11/20] Use tempofb.FindTag --- modules/ingester/instance_search.go | 19 ++++++----------- pkg/tempofb/searchdata_util.go | 1 - tempodb/search/backend_search_block.go | 27 ++++++++++-------------- tempodb/search/streaming_search_block.go | 4 +++- 4 files changed, 21 insertions(+), 30 deletions(-) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 43999029924..73793de4463 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -1,7 +1,6 @@ package ingester import ( - "bytes" "context" "sort" @@ -216,18 +215,14 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop kv := &tempofb.KeyValues{} tagNameBytes := []byte(tagName) err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { - // TODO use binary search here - for i, tagsLength := 0, entry.TagsLength(); i < tagsLength; i++ { - entry.Tags(kv, i) - if bytes.Equal(kv.Key(), tagNameBytes) { - - for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { - // check the value is already set, this is more performant with repetitive values - if _, ok := values[string(kv.Value(j))]; !ok { - values[string(kv.Value(j))] = struct{}{} - } + kv := tempofb.FindTag(entry, kv, tagNameBytes) + if kv != nil { + for i, ii := 0, kv.ValueLength(); i < ii; i++ { + key := string(kv.Value(i)) + // check the value is already set, this is more performant with repetitive values + if _, ok := values[key]; !ok { + values[key] = struct{}{} } - break } } }) diff --git a/pkg/tempofb/searchdata_util.go b/pkg/tempofb/searchdata_util.go index c37da7ebe99..ca9baff40d5 100644 --- a/pkg/tempofb/searchdata_util.go +++ b/pkg/tempofb/searchdata_util.go @@ -2,7 +2,6 @@ package tempofb import ( "bytes" - "sort" flatbuffers "github.com/google/flatbuffers/go" "github.com/grafana/tempo/tempodb/encoding/common" diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index 95bf7d28545..0f71aec17fe 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -1,7 +1,6 @@ package search import ( - "bytes" "context" "io" @@ -150,8 +149,10 @@ func (s *BackendSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) kv := &tempofb.KeyValues{} for i, ii := 0, header.TagsLength(); i < ii; i++ { header.Tags(kv, i) - if _, ok := tags[string(kv.Key())]; !ok { - tags[string(kv.Key())] = struct{}{} + key := string(kv.Key()) + // check the tag is already set, this is more performant with repetitive values + if _, ok := tags[key]; !ok { + tags[key] = struct{}{} } } @@ -164,22 +165,16 @@ func (s *BackendSearchBlock) TagValues(ctx context.Context, tagName string, tagV return err } - kv := &tempofb.KeyValues{} - tagNameBytes := []byte(tagName) - for i, tagsLength := 0, header.TagsLength(); i < tagsLength; i++ { - header.Tags(kv, i) - - // TODO use binary search? - if bytes.Equal(kv.Key(), tagNameBytes) { - for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { - if _, ok := tagValues[string(kv.Value(j))]; !ok { - tagValues[string(kv.Value(j))] = struct{}{} - } + kv := tempofb.FindTag(header, &tempofb.KeyValues{}, []byte(tagName)) + if kv != nil { + for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { + value := string(kv.Value(j)) + // check the value is already set, this is more performant with repetitive values + if _, ok := tagValues[value]; !ok { + tagValues[value] = struct{}{} } - break } } - return nil } diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 84363cdac17..8ed9274585c 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -91,6 +91,7 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { s.header.Tags.Range(func(k, v string) { + // check the tag is already set, this is more performant with repetitive values if _, ok := tags[k]; !ok { tags[k] = struct{}{} } @@ -99,9 +100,10 @@ func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{ } func (s *StreamingSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { - // TODO optimize to grab all values of tagName + // TODO optimize by adding a function to copy all values for a tag s.header.Tags.Range(func(k, v string) { if tagName == k { + // check the value is already set, this is more performant with repetitive values if _, ok := tagValues[v]; !ok { tagValues[v] = struct{}{} } From fbb66c11e1b775dc57d71a96bc7804ea9024ea38 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Wed, 27 Oct 2021 00:11:30 +0200 Subject: [PATCH 12/20] Add SearchDataMap RangeKeys and RangeKeyValues --- pkg/tempofb/searchdatamap.go | 26 ++++++++++ pkg/tempofb/searchdatamap_test.go | 63 ++++++++++++++++++++++++ tempodb/search/streaming_search_block.go | 14 ++---- 3 files changed, 94 insertions(+), 9 deletions(-) diff --git a/pkg/tempofb/searchdatamap.go b/pkg/tempofb/searchdatamap.go index 0f5f09adde4..efa4689c5ac 100644 --- a/pkg/tempofb/searchdatamap.go +++ b/pkg/tempofb/searchdatamap.go @@ -12,6 +12,8 @@ type SearchDataMap interface { Contains(k, v string) bool WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT Range(f func(k, v string)) + RangeKeys(f func(k string)) + RangeKeyValues(k string, f func(v string)) } func NewSearchDataMap() SearchDataMap { @@ -70,6 +72,18 @@ func (s SearchDataMapSmall) Range(f func(k, v string)) { } } +func (s SearchDataMapSmall) RangeKeys(f func(k string)) { + for k, _ := range s { + f(k) + } +} + +func (s SearchDataMapSmall) RangeKeyValues(k string, f func(v string)) { + for _, v := range s[k] { + f(v) + } +} + func (s SearchDataMapSmall) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT { keys := make([]string, 0, len(s)) for k := range s { @@ -115,6 +129,18 @@ func (s SearchDataMapLarge) Range(f func(k, v string)) { } } +func (s SearchDataMapLarge) RangeKeys(f func(k string)) { + for k := range s { + f(k) + } +} + +func (s SearchDataMapLarge) RangeKeyValues(k string, f func(v string)) { + for v := range s[k] { + f(v) + } +} + func (s SearchDataMapLarge) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT { keys := make([]string, 0, len(s)) for k := range s { diff --git a/pkg/tempofb/searchdatamap_test.go b/pkg/tempofb/searchdatamap_test.go index 63c33feb5ef..c6a3d23323d 100644 --- a/pkg/tempofb/searchdatamap_test.go +++ b/pkg/tempofb/searchdatamap_test.go @@ -3,8 +3,71 @@ package tempofb import ( "fmt" "testing" + + "github.com/stretchr/testify/assert" ) +func TestSearchDataMap(t *testing.T) { + testCases := []struct { + name string + impl SearchDataMap + }{ + {"SearchDataMapSmall", &SearchDataMapSmall{}}, + {"SearchDataMapLarge", &SearchDataMapLarge{}}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + searchDataMap := tc.impl + + assert.False(t, searchDataMap.Contains("key-1", "value-1-2")) + + searchDataMap.Add("key-1", "value-1-1") + + assert.False(t, searchDataMap.Contains("key-1", "value-1-2")) + + searchDataMap.Add("key-1", "value-1-2") + searchDataMap.Add("key-2", "value-2-1") + + assert.True(t, searchDataMap.Contains("key-1", "value-1-2")) + assert.False(t, searchDataMap.Contains("key-2", "value-1-2")) + + type Pair struct { + k string + v string + } + var pairs []Pair + capturePairFn := func(k, v string) { + pairs = append(pairs, Pair{k, v}) + } + + searchDataMap.Range(capturePairFn) + assert.ElementsMatch(t, []Pair{{"key-1", "value-1-1"}, {"key-1", "value-1-2"}, {"key-2", "value-2-1"}}, pairs) + + var strs []string + captureSliceFn := func(value string) { + strs = append(strs, value) + } + + searchDataMap.RangeKeys(captureSliceFn) + assert.ElementsMatch(t, []string{"key-1", "key-2"}, strs) + strs = nil + + searchDataMap.RangeKeyValues("key-1", captureSliceFn) + assert.ElementsMatch(t, []string{"value-1-1", "value-1-2"}, strs) + strs = nil + + searchDataMap.RangeKeyValues("key-2", captureSliceFn) + assert.ElementsMatch(t, []string{"value-2-1"}, strs) + strs = nil + + searchDataMap.RangeKeyValues("does-not-exist", captureSliceFn) + assert.ElementsMatch(t, []string{}, strs) + strs = nil + }) + } +} + func BenchmarkSearchDataMapAdd(b *testing.B) { intfs := []struct { name string diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 8ed9274585c..acbe2094a60 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -90,7 +90,7 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD } func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { - s.header.Tags.Range(func(k, v string) { + s.header.Tags.RangeKeys(func(k string) { // check the tag is already set, this is more performant with repetitive values if _, ok := tags[k]; !ok { tags[k] = struct{}{} @@ -100,14 +100,10 @@ func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{ } func (s *StreamingSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { - // TODO optimize by adding a function to copy all values for a tag - s.header.Tags.Range(func(k, v string) { - if tagName == k { - // check the value is already set, this is more performant with repetitive values - if _, ok := tagValues[v]; !ok { - tagValues[v] = struct{}{} - } - + s.header.Tags.RangeKeyValues(tagName, func(v string) { + // check the value is already set, this is more performant with repetitive values + if _, ok := tagValues[v]; !ok { + tagValues[v] = struct{}{} } }) return nil From f3dfef9184f0c7e33fa6357481eb4fd07b9c8402 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Wed, 27 Oct 2021 00:43:22 +0200 Subject: [PATCH 13/20] make fmt --- pkg/tempofb/searchdatamap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tempofb/searchdatamap.go b/pkg/tempofb/searchdatamap.go index efa4689c5ac..cff823b2376 100644 --- a/pkg/tempofb/searchdatamap.go +++ b/pkg/tempofb/searchdatamap.go @@ -73,7 +73,7 @@ func (s SearchDataMapSmall) Range(f func(k, v string)) { } func (s SearchDataMapSmall) RangeKeys(f func(k string)) { - for k, _ := range s { + for k := range s { f(k) } } From 744be0c2db899b76310ddf37aceb72907d0d0098 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Wed, 27 Oct 2021 00:54:05 +0200 Subject: [PATCH 14/20] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bf893a6aa3..41361f2433e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ * [ENHANCEMENT] Allow search disablement in vulture [#1069](https://github.com/grafana/tempo/pull/1069) (@zalegrala) * [ENHANCEMENT] Jsonnet: add `$._config.search_enabled`, correctly set `http_api_prefix` in config [#1072](https://github.com/grafana/tempo/pull/1072) (@kvrhdn) * [ENHANCEMENT] Performance: Remove WAL contention between ingest and searches [#1076](https://github.com/grafana/tempo/pull/1076) (@mdisibio) +* [ENHANCEMENT] Search: drop use of TagCache, extract tags and tag values on-demand [#1068](https://github.com/grafana/tempo/pull/1068) (@kvrhdn) * [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala) * [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio) * [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) From b8724ee90f205a1859b3a5bb7d7f6173fd236dc7 Mon Sep 17 00:00:00 2001 From: Koenraad Verheyden Date: Wed, 27 Oct 2021 15:04:47 +0200 Subject: [PATCH 15/20] Cast to string once --- modules/ingester/instance_search.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 73793de4463..ea762d1a069 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -187,9 +187,10 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { for i, ii := 0, entry.TagsLength(); i < ii; i++ { entry.Tags(kv, i) + key := string(kv.Key()) // check the tag is already set, this is more performant with repetitive values - if _, ok := tags[string(kv.Key())]; !ok { - tags[string(kv.Key())] = struct{}{} + if _, ok := tags[key]; !ok { + tags[key] = struct{}{} } } }) From 16570c76504ef4036b34540c35ad4962243b2671 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 27 Oct 2021 12:01:18 -0400 Subject: [PATCH 16/20] Reuse SearchEntry buffer where possible --- modules/ingester/instance_search.go | 13 +++++++++---- pkg/tempofb/searchdata_test.go | 2 +- pkg/tempofb/searchdata_util.go | 7 ++++++- tempodb/search/backend_search_block.go | 5 +++-- tempodb/search/data_combiner.go | 5 +++-- tempodb/search/pipeline_test.go | 6 +++--- tempodb/search/rescan_blocks.go | 2 +- tempodb/search/streaming_search_block.go | 6 ++++-- 8 files changed, 30 insertions(+), 16 deletions(-) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 73793de4463..868b6533afd 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -92,6 +92,8 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * span.LogFields(ot_log.Event("live traces mtx acquired")) + entry := &tempofb.SearchEntry{} // buffer + for _, t := range i.traces { if sr.Quit() { return @@ -105,7 +107,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * for _, s := range t.searchData { sr.AddBytesInspected(uint64(len(s))) - entry := tempofb.SearchEntryFromBytes(s) + entry.Reset(s) if p.Matches(entry) { newResult := search.GetSearchResultFromData(entry) if result != nil { @@ -187,9 +189,10 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { for i, ii := 0, entry.TagsLength(); i < ii; i++ { entry.Tags(kv, i) + key := string(kv.Value(i)) // check the tag is already set, this is more performant with repetitive values - if _, ok := tags[string(kv.Key())]; !ok { - tags[string(kv.Key())] = struct{}{} + if _, ok := tags[key]; !ok { + tags[key] = struct{}{} } } }) @@ -249,9 +252,11 @@ func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visitFn fun i.tracesMtx.Lock() defer i.tracesMtx.Unlock() + se := &tempofb.SearchEntry{} for _, t := range i.traces { for _, s := range t.searchData { - visitFn(tempofb.SearchEntryFromBytes(s)) + se.Reset(s) + visitFn(se) if err := ctx.Err(); err != nil { return err diff --git a/pkg/tempofb/searchdata_test.go b/pkg/tempofb/searchdata_test.go index 2b03ab13d44..3f521b9e390 100644 --- a/pkg/tempofb/searchdata_test.go +++ b/pkg/tempofb/searchdata_test.go @@ -101,7 +101,7 @@ func TestContainsTag(t *testing.T) { m.AddTag("key5", "value") m.AddTag("key6", "value") - e := SearchEntryFromBytes(m.ToBytes()) + e := NewSearchEntryFromBytes(m.ToBytes()) kv := &KeyValues{} diff --git a/pkg/tempofb/searchdata_util.go b/pkg/tempofb/searchdata_util.go index ca9baff40d5..0591c74a162 100644 --- a/pkg/tempofb/searchdata_util.go +++ b/pkg/tempofb/searchdata_util.go @@ -145,7 +145,12 @@ func (s *SearchEntry) Contains(k []byte, v []byte, buffer *KeyValues) bool { return ContainsTag(s, buffer, k, v) } -func SearchEntryFromBytes(b []byte) *SearchEntry { +func (s *SearchEntry) Reset(b []byte) { + n := flatbuffers.GetUOffsetT(b) + s.Init(b, n) +} + +func NewSearchEntryFromBytes(b []byte) *SearchEntry { return GetRootAsSearchEntry(b, 0) } diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index 0f71aec17fe..2ec51f3e9a6 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -28,7 +28,8 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, block var err error ctx := context.TODO() indexPageSize := 100 * 1024 - kv := &tempofb.KeyValues{} // buffer + kv := &tempofb.KeyValues{} // buffer + s := &tempofb.SearchEntry{} // buffer // Pinning specific version instead of latest for safety version, err := encoding.FromVersion("v2") @@ -69,7 +70,7 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, block continue } - s := tempofb.SearchEntryFromBytes(data) + s.Reset(data) header.AddEntry(s) diff --git a/tempodb/search/data_combiner.go b/tempodb/search/data_combiner.go index 5b51846837b..1c919350789 100644 --- a/tempodb/search/data_combiner.go +++ b/tempodb/search/data_combiner.go @@ -23,13 +23,14 @@ func (*DataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) { // Squash all datas into 1 data := tempofb.SearchEntryMutable{} - kv := &tempofb.KeyValues{} // buffer + kv := &tempofb.KeyValues{} // buffer + sd := &tempofb.SearchEntry{} // buffer for _, sb := range searchData { // we append zero-length entries to the WAL even when search is disabled. skipping to prevent unmarshalling and panik :) if len(sb) == 0 { continue } - sd := tempofb.SearchEntryFromBytes(sb) + sd.Reset(sb) for i, ii := 0, sd.TagsLength(); i < ii; i++ { sd.Tags(kv, i) for j, jj := 0, kv.ValueLength(); j < jj; j++ { diff --git a/tempodb/search/pipeline_test.go b/tempodb/search/pipeline_test.go index 02374162a6d..6de433da164 100644 --- a/tempodb/search/pipeline_test.go +++ b/tempodb/search/pipeline_test.go @@ -55,7 +55,7 @@ func TestPipelineMatchesTags(t *testing.T) { data := tempofb.SearchEntryMutable{ Tags: tempofb.NewSearchDataMapWithData(tc.searchData), } - sd := tempofb.SearchEntryFromBytes(data.ToBytes()) + sd := tempofb.NewSearchEntryFromBytes(data.ToBytes()) matches := p.Matches(sd) require.Equal(t, tc.shouldMatch, matches) @@ -125,7 +125,7 @@ func TestPipelineMatchesTraceDuration(t *testing.T) { StartTimeUnixNano: uint64(tc.spanStart), EndTimeUnixNano: uint64(tc.spanEnd), } - sd := tempofb.SearchEntryFromBytes(data.ToBytes()) + sd := tempofb.NewSearchEntryFromBytes(data.ToBytes()) matches := p.Matches(sd) require.Equal(t, tc.shouldMatch, matches) @@ -185,7 +185,7 @@ func TestPipelineMatchesBlock(t *testing.T) { func BenchmarkPipelineMatches(b *testing.B) { - entry := tempofb.SearchEntryFromBytes((&tempofb.SearchEntryMutable{ + entry := tempofb.NewSearchEntryFromBytes((&tempofb.SearchEntryMutable{ StartTimeUnixNano: 0, EndTimeUnixNano: uint64(500 * time.Millisecond / time.Nanosecond), //500ms in nanoseconds Tags: tempofb.NewSearchDataMapWithData(map[string][]string{ diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go index 47d9afeef94..7b6f01553ca 100644 --- a/tempodb/search/rescan_blocks.go +++ b/tempodb/search/rescan_blocks.go @@ -88,7 +88,7 @@ func newStreamingSearchBlockFromWALReplay(searchFilepath, filename string) (*Str blockHeader := tempofb.NewSearchBlockHeaderMutable() records, warning, err := wal.ReplayWALAndGetRecords(f, v, enc, func(bytes []byte) error { - entry := tempofb.SearchEntryFromBytes(bytes) + entry := tempofb.NewSearchEntryFromBytes(bytes) blockHeader.AddEntry(entry) return nil }) diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index acbe2094a60..e9e2b197312 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -83,7 +83,7 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD } s.headerMtx.Lock() - s.header.AddEntry(tempofb.SearchEntryFromBytes(combined)) + s.header.AddEntry(tempofb.NewSearchEntryFromBytes(combined)) s.headerMtx.Unlock() return s.appender.Append(id, combined) @@ -111,6 +111,8 @@ func (s *StreamingSearchBlock) TagValues(ctx context.Context, tagName string, ta // Search the streaming block. func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { + entry := &tempofb.SearchEntry{} + if s.closed.Load() { // Generally this means block has already been deleted return nil @@ -149,7 +151,7 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul sr.AddBytesInspected(uint64(len(obj))) sr.AddTraceInspected(1) - entry := tempofb.SearchEntryFromBytes(obj) + entry.Reset(obj) if !p.Matches(entry) { continue From ab3b5b4d17e9712029178a8e91b9628837d9ee96 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 27 Oct 2021 14:16:29 -0400 Subject: [PATCH 17/20] fix key/value typo --- modules/ingester/instance_search.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 868b6533afd..892adce560e 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -189,7 +189,7 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { for i, ii := 0, entry.TagsLength(); i < ii; i++ { entry.Tags(kv, i) - key := string(kv.Value(i)) + key := string(kv.Key()) // check the tag is already set, this is more performant with repetitive values if _, ok := tags[key]; !ok { tags[key] = struct{}{} From 362d97c140f369239c2e0558fa5630886928b633 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 25 Nov 2021 19:26:15 +0530 Subject: [PATCH 18/20] Add limit on response size for a tag-values query Signed-off-by: Annanay --- cmd/tempo/app/modules.go | 2 +- .../tempo-search/overrides.yaml | 1 + modules/ingester/instance_search.go | 32 +++++++++++++++++-- modules/overrides/limits.go | 22 +++++++++---- modules/overrides/overrides.go | 5 +++ modules/querier/querier.go | 19 ++++++++--- pkg/util/map_size.go | 14 ++++++++ 7 files changed, 79 insertions(+), 16 deletions(-) create mode 100644 pkg/util/map_size.go diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index b8d4765c536..3ac45ed5d3e 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -310,7 +310,7 @@ func (t *App) setupModuleManager() error { Ring: {Server, MemberlistKV}, Distributor: {Ring, Server, Overrides}, Ingester: {Store, Server, Overrides, MemberlistKV}, - Querier: {Store, Ring}, + Querier: {Store, Ring, Overrides}, Compactor: {Store, Server, Overrides, MemberlistKV}, SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor}, ScalableSingleBinary: {SingleBinary}, diff --git a/example/docker-compose/tempo-search/overrides.yaml b/example/docker-compose/tempo-search/overrides.yaml index 72971ca395f..b431c9006d9 100644 --- a/example/docker-compose/tempo-search/overrides.yaml +++ b/example/docker-compose/tempo-search/overrides.yaml @@ -9,4 +9,5 @@ overrides: max_global_traces_per_user: 0 max_bytes_per_trace: 50000 max_search_bytes_per_trace: 0 + max_bytes_per_tag_values_query: 5000000 block_retention: 0s diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index a7c62b8c733..bf17179e0ad 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -2,12 +2,13 @@ package ingester import ( "context" - "sort" - cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" + "github.com/grafana/tempo/pkg/util" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" + "github.com/weaveworks/common/user" + "sort" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" @@ -215,9 +216,16 @@ func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) { values := map[string]struct{}{} + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + // get limit from override + maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID) + kv := &tempofb.KeyValues{} tagNameBytes := []byte(tagName) - err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { + err = i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { kv := tempofb.FindTag(entry, kv, tagNameBytes) if kv != nil { for i, ii := 0, kv.ValueLength(); i < ii; i++ { @@ -233,6 +241,15 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop return nil, err } + // check if size of values map is within limit after scanning live traces + if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) { + level.Warn(cortex_util.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName) + // return empty response to avoid querier OOMs + return &tempopb.SearchTagValuesResponse{ + TagValues: []string{}, + }, nil + } + err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error { return block.TagValues(ctx, tagName, values) }) @@ -240,6 +257,15 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop return nil, err } + // check if size of values map is within limit after scanning all blocks + if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) { + level.Warn(cortex_util.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName) + // return empty response to avoid querier OOMs + return &tempopb.SearchTagValuesResponse{ + TagValues: []string{}, + }, nil + } + return &tempopb.SearchTagValuesResponse{ TagValues: extractKeys(values), }, nil diff --git a/modules/overrides/limits.go b/modules/overrides/limits.go index dd298dfcd7d..03b865353d5 100644 --- a/modules/overrides/limits.go +++ b/modules/overrides/limits.go @@ -21,13 +21,14 @@ const ( ErrorPrefixRateLimited = "RATE_LIMITED:" // metrics - MetricMaxLocalTracesPerUser = "max_local_traces_per_user" - MetricMaxGlobalTracesPerUser = "max_global_traces_per_user" - MetricMaxBytesPerTrace = "max_bytes_per_trace" - MetricMaxSearchBytesPerTrace = "max_search_bytes_per_trace" - MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes" - MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes" - MetricBlockRetention = "block_retention" + MetricMaxLocalTracesPerUser = "max_local_traces_per_user" + MetricMaxGlobalTracesPerUser = "max_global_traces_per_user" + MetricMaxBytesPerTrace = "max_bytes_per_trace" + MetricMaxSearchBytesPerTrace = "max_search_bytes_per_trace" + MetricMaxBytesPerTagValuesQuery = "max_bytes_per_tag_values_query" + MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes" + MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes" + MetricBlockRetention = "block_retention" ) var ( @@ -57,6 +58,9 @@ type Limits struct { // Compactor enforced limits. BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"` + // Querier enforced limits. + MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"` + // Configuration for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"` @@ -75,6 +79,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxBytesPerTrace, "ingester.max-bytes-per-trace", 50e5, "Maximum size of a trace in bytes. 0 to disable.") f.IntVar(&l.MaxSearchBytesPerTrace, "ingester.max-search-bytes-per-trace", 50e3, "Maximum size of search data per trace in bytes. 0 to disable.") + // Querier limits + f.IntVar(&l.MaxBytesPerTagValuesQuery, "querier.max-bytes-per-tag-values-query", 50e5, "Maximum size of response for a tag-values query. Used mainly to limit large the number of values associated with a particular tag") + f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") _ = l.PerTenantOverridePeriod.Set("10s") f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") @@ -89,6 +96,7 @@ func (l *Limits) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxGlobalTracesPerUser), MetricMaxGlobalTracesPerUser) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTrace), MetricMaxBytesPerTrace) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxSearchBytesPerTrace), MetricMaxSearchBytesPerTrace) + ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTagValuesQuery), MetricMaxBytesPerTagValuesQuery) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionRateLimitBytes), MetricIngestionRateLimitBytes) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionBurstSizeBytes), MetricIngestionBurstSizeBytes) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.BlockRetention), MetricBlockRetention) diff --git a/modules/overrides/overrides.go b/modules/overrides/overrides.go index f6f463214ca..4970ec3cde5 100644 --- a/modules/overrides/overrides.go +++ b/modules/overrides/overrides.go @@ -242,6 +242,11 @@ func (o *Overrides) MaxSearchBytesPerTrace(userID string) int { return o.getOverridesForUser(userID).MaxSearchBytesPerTrace } +// MaxBytesPerTagValuesQuery returns the maximum size of a response to a tag-values query allowed for a user. +func (o *Overrides) MaxBytesPerTagValuesQuery(userID string) int { + return o.getOverridesForUser(userID).MaxBytesPerTagValuesQuery +} + // IngestionRateLimitBytes is the number of spans per second allowed for this tenant. func (o *Overrides) IngestionRateLimitBytes(userID string) float64 { return float64(o.getOverridesForUser(userID).IngestionRateLimitBytes) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 0d1eb1e7d4a..b2fe356c292 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -3,10 +3,6 @@ package querier import ( "context" "fmt" - "net/http" - "sort" - "sync" - cortex_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" @@ -19,6 +15,7 @@ import ( "github.com/grafana/tempo/modules/storage" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/validation" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/search" @@ -30,6 +27,9 @@ import ( httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/user" "go.uber.org/multierr" + "net/http" + "sort" + "sync" ) var ( @@ -329,11 +329,14 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest } func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) { - _, err := user.ExtractOrgID(ctx) + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, errors.Wrap(err, "error extracting org id in Querier.SearchTagValues") } + // fetch response size limit for tag-values query + tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID) + replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read) if err != nil { return nil, errors.Wrap(err, "error finding ingesters in Querier.SearchTagValues") @@ -360,6 +363,12 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal uniqueMap[v] = struct{}{} } + if !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) { + return &tempopb.SearchTagValuesResponse{ + TagValues: []string{}, + }, nil + } + // Final response (sorted) resp := &tempopb.SearchTagValuesResponse{ TagValues: make([]string, 0, len(uniqueMap)), diff --git a/pkg/util/map_size.go b/pkg/util/map_size.go new file mode 100644 index 00000000000..179328e5c7b --- /dev/null +++ b/pkg/util/map_size.go @@ -0,0 +1,14 @@ +package util + +// MapSizeWithinLimit evaluates the total size of all keys in the map against the limit +func MapSizeWithinLimit(uniqueMap map[string]struct{}, limit int) bool { + var mapSize int + for key := range uniqueMap { + mapSize += len(key) + } + + if mapSize < limit { + return true + } + return false +} \ No newline at end of file From f7e95f0babc5dd5a9ab1f88d94428f994d749509 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 1 Dec 2021 17:56:58 +0530 Subject: [PATCH 19/20] Lint and CHANGELOG Signed-off-by: Annanay --- CHANGELOG.md | 1 + modules/ingester/instance_search.go | 3 ++- modules/querier/querier.go | 7 ++++--- pkg/util/map_size.go | 7 ++----- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5767c4cf016..c1c1834002d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## main / unreleased +* [CHANGE] Search: Add new per-tenant limit `max_bytes_per_tag_values_query` to limit the size of tag-values response. [#1068](https://github.com/grafana/tempo/pull/1068) (@annanay25) * [ENHANCEMENT] Expose `upto` parameter on hedged requests for each backend with `hedge_requests_up_to`. [#1085](https://github.com/grafana/tempo/pull/1085) (@joe-elliott) * [ENHANCEMENT] Search: drop use of TagCache, extract tags and tag values on-demand [#1068](https://github.com/grafana/tempo/pull/1068) (@kvrhdn) * [ENHANCEMENT] Jsonnet: add `$._config.namespace` to filter by namespace in cortex metrics [#1098](https://github.com/grafana/tempo/pull/1098) (@mapno) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index bf17179e0ad..549962f97ef 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -2,13 +2,14 @@ package ingester import ( "context" + "sort" + cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" "github.com/grafana/tempo/pkg/util" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" "github.com/weaveworks/common/user" - "sort" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" diff --git a/modules/querier/querier.go b/modules/querier/querier.go index b2fe356c292..95a9fa49004 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -3,6 +3,10 @@ package querier import ( "context" "fmt" + "net/http" + "sort" + "sync" + cortex_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" @@ -27,9 +31,6 @@ import ( httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/user" "go.uber.org/multierr" - "net/http" - "sort" - "sync" ) var ( diff --git a/pkg/util/map_size.go b/pkg/util/map_size.go index 179328e5c7b..2fea9c15feb 100644 --- a/pkg/util/map_size.go +++ b/pkg/util/map_size.go @@ -7,8 +7,5 @@ func MapSizeWithinLimit(uniqueMap map[string]struct{}, limit int) bool { mapSize += len(key) } - if mapSize < limit { - return true - } - return false -} \ No newline at end of file + return mapSize < limit +} From 535263cada98186ead4faa174fb4ed3470314c54 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 2 Dec 2021 16:16:14 +0530 Subject: [PATCH 20/20] lint, fix test by adding userID to ctx Signed-off-by: Annanay --- modules/ingester/instance_search_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index e5423974031..11e4c2852fd 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -10,6 +10,11 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/atomic" + "github.com/weaveworks/common/user" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempofb" @@ -17,9 +22,6 @@ import ( "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/search" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber-go/atomic" ) func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) { @@ -252,7 +254,9 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { }) go concurrent(func() { - _, err := i.SearchTagValues(context.Background(), tagKey) + // SearchTagValues queries now require userID in ctx + ctx := user.InjectOrgID(context.Background(), "test") + _, err := i.SearchTagValues(ctx, tagKey) require.NoError(t, err, "error getting search tag values") })