Skip to content

Commit

Permalink
Speedup DistinctValue collector and exit early for ingesters (grafana…
Browse files Browse the repository at this point in the history
…#4104)

* make the collector go fast...

* fixup usage and log lines

* exit early when we hit the limits of collector

* cleanup

* CHANGELOG.md

* fix lint

* break with goto

* locked and loaded
  • Loading branch information
electron0zero authored and knylander-grafana committed Sep 26, 2024
1 parent f06b671 commit 5032453
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValu
}

func NewSearchTagValuesV2(limitBytes int) Combiner {
// Distinct collector with no limit
d := collector.NewDistinctValue(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })
// Distinct collector with no limit and diff enabled
d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })

return &genericCombiner[*tempopb.SearchTagValuesV2Response]{
httpStatusCode: 200,
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
}

if valueCollector.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", valueCollector.TotalDataSize())
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", valueCollector.Size())
}

resp := &tempopb.SearchTagValuesV2Response{}
Expand Down
27 changes: 25 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ func (q *Querier) SearchTagsBlocks(ctx context.Context, req *tempopb.SearchTagsB

for _, t := range s.Tags {
distinctValues.Collect(t)
if distinctValues.Exceeded() {
break // stop early
}
}
}

Expand Down Expand Up @@ -539,9 +542,14 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err)
}

outerLoop:
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagsResponse).TagNames {
distinctValues.Collect(res)
if distinctValues.Exceeded() {
break outerLoop // break out of all loops
}
}
}

Expand Down Expand Up @@ -573,10 +581,14 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque
limit := q.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := collector.NewScopedDistinctString(limit)

outerLoop:
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagsV2Response).Scopes {
for _, tag := range res.Tags {
distinctValues.Collect(res.Name, tag)
if distinctValues.Exceeded() {
break outerLoop // break out of all loops
}
}
}
}
Expand Down Expand Up @@ -610,6 +622,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal

// Virtual tags values. Get these first.
for _, v := range search.GetVirtualTagValues(req.TagName) {
// virtual tags are small so no need to stop early here
distinctValues.Collect(v)
}

Expand All @@ -619,9 +632,14 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err)
}

outerLoop:
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagValuesResponse).TagValues {
distinctValues.Collect(res)
if distinctValues.Exceeded() {
break outerLoop // break out of all loops
}
}
}

Expand All @@ -648,6 +666,7 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV
// Virtual tags values. Get these first.
virtualVals := search.GetVirtualTagValuesV2(req.TagName)
for _, v := range virtualVals {
// no need to stop early here, virtual tags are small
distinctValues.Collect(v)
}

Expand All @@ -664,14 +683,18 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV
if err != nil {
return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err)
}

outerLoop:
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagValuesV2Response).TagValues {
distinctValues.Collect(*res)
if distinctValues.Collect(*res) {
break outerLoop // break out of all loops
}
}
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", distinctValues.Size())
}

return valuesToV2Response(distinctValues), nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/collector/distinct_string_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func NewDistinctString(maxDataSize int) *DistinctString {
}
}

// Collect adds a new value to the distinct string collector.
// return indicates if the value was added or not.
func (d *DistinctString) Collect(s string) bool {
if _, ok := d.values[s]; ok {
// Already present
Expand Down
107 changes: 65 additions & 42 deletions pkg/collector/distinct_value_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,95 +5,118 @@ import (
)

type DistinctValue[T comparable] struct {
values map[T]struct{}
new map[T]struct{}
len func(T) int
maxLen int
currLen int
totalLen int
mtx sync.RWMutex
values map[T]struct{}
new map[T]struct{}
len func(T) int
maxLen int
currLen int
limExceeded bool
diffEnabled bool
mtx sync.Mutex
}

// NewDistinctValue with the given maximum data size. This is calculated
// as the total length of the recorded strings. For ease of use, maximum=0
// is interpreted as unlimited.
// Use NewDistinctValueWithDiff to enable diff support, but that one is slightly slower.
func NewDistinctValue[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] {
return &DistinctValue[T]{
values: make(map[T]struct{}),
new: make(map[T]struct{}),
maxLen: maxDataSize,
len: len,
values: make(map[T]struct{}),
new: make(map[T]struct{}),
maxLen: maxDataSize,
diffEnabled: false, // disable diff to make it faster
len: len,
}
}

// NewDistinctValueWithDiff is like NewDistinctValue but with diff support enabled.
func NewDistinctValueWithDiff[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] {
return &DistinctValue[T]{
values: make(map[T]struct{}),
new: make(map[T]struct{}),
maxLen: maxDataSize,
diffEnabled: true,
len: len,
}
}

// Collect adds a new value to the distinct value collector.
// return true when it reaches the limits and can't fit more values.
// callers of return of Collect or call Exceeded to stop early.
func (d *DistinctValue[T]) Collect(v T) (exceeded bool) {
d.mtx.RLock()
if _, ok := d.values[v]; ok {
d.mtx.RUnlock()
return // Already present
d.mtx.Lock()
defer d.mtx.Unlock()

if d.limExceeded {
return true
}
d.mtx.RUnlock()

// Calculate length
valueLen := d.len(v)

d.mtx.Lock()
defer d.mtx.Unlock()
// Can it fit?
// note: we will stop adding values slightly before the limit is reached
if d.maxLen > 0 && d.currLen+valueLen >= d.maxLen {
// No, it can't fit
d.limExceeded = true
return true
}

if _, ok := d.values[v]; ok {
return // Already present
}

// Record total inspected length regardless
d.totalLen += valueLen

// Can it fit?
if d.maxLen > 0 && d.currLen+valueLen > d.maxLen {
// No
return true
if d.diffEnabled {
d.new[v] = struct{}{}
}

d.new[v] = struct{}{}
d.values[v] = struct{}{}
d.currLen += valueLen

return false
}

// Values returns the final list of distinct values collected and sorted.
func (d *DistinctValue[T]) Values() []T {
ss := make([]T, 0, len(d.values))

d.mtx.RLock()
defer d.mtx.RUnlock()
d.mtx.Lock()
defer d.mtx.Unlock()

ss := make([]T, 0, len(d.values))
for k := range d.values {
ss = append(ss, k)
}

return ss
}

// Exceeded indicates if some values were lost because the maximum size limit was met.
// Exceeded indicates that we have exceeded the limit
// can be used to stop early and to avoid collecting further values
func (d *DistinctValue[T]) Exceeded() bool {
d.mtx.RLock()
defer d.mtx.RUnlock()
return d.totalLen > d.currLen
d.mtx.Lock()
defer d.mtx.Unlock()

return d.limExceeded
}

// TotalDataSize is the total size of all distinct strings encountered.
func (d *DistinctValue[T]) TotalDataSize() int {
d.mtx.RLock()
defer d.mtx.RUnlock()
return d.totalLen
// Size is the total size of all distinct items collected
func (d *DistinctValue[T]) Size() int {
d.mtx.Lock()
defer d.mtx.Unlock()

return d.currLen
}

// Diff returns all new strings collected since the last time diff was called
// returns nil if diff is not enabled
func (d *DistinctValue[T]) Diff() []T {
ss := make([]T, 0, len(d.new))
d.mtx.Lock()
defer d.mtx.Unlock()

d.mtx.RLock()
defer d.mtx.RUnlock()
if !d.diffEnabled {
return nil
}

ss := make([]T, 0, len(d.new))
for k := range d.new {
ss = append(ss, k)
}
Expand Down
46 changes: 45 additions & 1 deletion pkg/collector/distinct_value_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package collector

import (
"fmt"
"sort"
"strconv"
"testing"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
)

func TestDistinctValueCollectorDiff(t *testing.T) {
d := NewDistinctValue[string](0, func(s string) int { return len(s) })
d := NewDistinctValueWithDiff[string](0, func(s string) int { return len(s) })

d.Collect("123")
d.Collect("4567")
Expand All @@ -28,3 +31,44 @@ func stringsSlicesEqual(t *testing.T, a, b []string) {
sort.Strings(b)
require.Equal(t, a, b)
}

func BenchmarkCollect(b *testing.B) {
// simulate 100 ingesters, each returning 10_000 tag values
numIngesters := 100
numTagValuesPerIngester := 10_000
ingesterTagValues := make([][]tempopb.TagValue, numIngesters)
for i := 0; i < numIngesters; i++ {
tagValues := make([]tempopb.TagValue, numTagValuesPerIngester)
for j := 0; j < numTagValuesPerIngester; j++ {
tagValues[j] = tempopb.TagValue{
Type: "string",
Value: fmt.Sprintf("value_%d_%d", i, j),
}
}
ingesterTagValues[i] = tagValues
}

limits := []int{
0, // no limit
100_000, // 100KB
1_000_000, // 1MB
10_000_000, // 10MB
}

b.ResetTimer() // to exclude the setup time for generating tag values
for _, lim := range limits {
b.Run("limit:"+strconv.Itoa(lim), func(b *testing.B) {
for n := 0; n < b.N; n++ {
// NewDistinctValue is collecting tag values without diff support
distinctValues := NewDistinctValue(lim, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) })
for _, tagValues := range ingesterTagValues {
for _, v := range tagValues {
if distinctValues.Collect(v) {
break // stop early if limit is reached
}
}
}
}
})
}
}

0 comments on commit 5032453

Please sign in to comment.