Skip to content

Commit

Permalink
Return partial results for tags and tag values lookups (#1517)
Browse files Browse the repository at this point in the history
* Return partial results for tags and tag values lookups

* changelog
  • Loading branch information
mdisibio authored Jun 28, 2022
1 parent 0d86e95 commit e1005a2
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ traces_spanmetrics_latency_{sum,count,bucket}
```
Additionally, default label `span_status` is renamed to `status_code`.
* [CHANGE] Update to Go 1.18 [#1504](https://github.com/grafana/tempo/pull/1504) (@annanay25)
* [CHANGE] Change tag/value lookups to return partial results when reaching response size limit instead of failing [#1517](https://github.com/grafana/tempo/pull/1517) (@mdisibio)
* [FEATURE] metrics-generator: support per-tenant processor configuration [#1434](https://github.com/grafana/tempo/pull/1434) (@kvrhdn)
* [FEATURE] Include rollout dashboard [#1456](https://github.com/grafana/tempo/pull/1456) (@zalegrala)
* [FEATURE] Add SentinelPassword configuration for Redis [#1463](https://github.com/grafana/tempo/pull/1463) (@zalegrala)
Expand Down
59 changes: 23 additions & 36 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ingester

import (
"context"
"fmt"
"sort"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -186,83 +185,79 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr
}

func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, error) {
tags := map[string]struct{}{}
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

kv := &tempofb.KeyValues{}
err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
err = i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
for i, ii := 0, entry.TagsLength(); i < ii; i++ {
entry.Tags(kv, i)
key := string(kv.Key())
// check the tag is already set, this is more performant with repetitive values
if _, ok := tags[key]; !ok {
tags[key] = struct{}{}
}
distinctValues.Collect(key)
}
})
if err != nil {
return nil, err
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
return block.Tags(ctx, tags)
return block.Tags(ctx, distinctValues.Collect)
})
if err != nil {
return nil, err
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
}

return &tempopb.SearchTagsResponse{
TagNames: extractKeys(tags),
TagNames: distinctValues.Strings(),
}, nil
}

func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) {
values := map[string]struct{}{}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

limit := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

kv := &tempofb.KeyValues{}
tagNameBytes := []byte(tagName)
err = i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
kv := tempofb.FindTag(entry, kv, tagNameBytes)
if kv != nil {
for i, ii := 0, kv.ValueLength(); i < ii; i++ {
key := string(kv.Value(i))
// check the value is already set, this is more performant with repetitive values
if _, ok := values[key]; !ok {
values[key] = struct{}{}
}
distinctValues.Collect(key)
}
}
})
if err != nil {
return nil, err
}

// check if size of values map is within limit after scanning live traces
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
if maxBytesPerTagValuesQuery > 0 && !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", maxBytesPerTagValuesQuery)
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
return block.TagValues(ctx, tagName, values)
return block.TagValues(ctx, tagName, distinctValues.Collect)
})
if err != nil {
return nil, err
}

// check if size of values map is within limit after scanning all blocks
if maxBytesPerTagValuesQuery > 0 && !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", maxBytesPerTagValuesQuery)
if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
}

return &tempopb.SearchTagValuesResponse{
TagValues: extractKeys(values),
TagValues: distinctValues.Strings(),
}, nil
}

Expand Down Expand Up @@ -354,11 +349,3 @@ func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visitFn
}
return nil
}

func extractKeys(set map[string]struct{}) []string {
keys := make([]string, 0, len(set))
for k := range set {
keys = append(keys, k)
}
return keys
}
16 changes: 9 additions & 7 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tag
assert.Equal(t, expectedTagValues, srv.TagValues)
}

// TestInstanceSearchMaxBytesPerTagValuesQueryFails confirms that SearchTagValues returns
// an error if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit
func TestInstanceSearchMaxBytesPerTagValuesQueryFails(t *testing.T) {
// TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial confirms that SearchTagValues returns
// partial results if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit
func TestInstanceSearchMaxBytesPerTagValuesQueryReturnsPartial(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{
MaxBytesPerTagValuesQuery: 10,
})
Expand All @@ -186,9 +186,9 @@ func TestInstanceSearchMaxBytesPerTagValuesQueryFails(t *testing.T) {
_, _ = writeTracesWithSearchData(t, i, tagKey, tagValue, true)

userCtx := user.InjectOrgID(context.Background(), "fake")
srv, err := i.SearchTagValues(userCtx, tagKey)
assert.Error(t, err)
assert.Nil(t, srv)
resp, err := i.SearchTagValues(userCtx, tagKey)
require.NoError(t, err)
require.Equal(t, 2, len(resp.TagValues)) // Only two values of the form "bar123" fit in the 10 byte limit above.
}

// writes traces to the given instance along with search data. returns
Expand Down Expand Up @@ -346,7 +346,9 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
})

go concurrent(func() {
_, err := i.SearchTags(context.Background())
// SearchTags queries now require userID in ctx
ctx := user.InjectOrgID(context.Background(), "test")
_, err := i.SearchTags(ctx)
require.NoError(t, err, "error getting search tags")
})

Expand Down
65 changes: 27 additions & 38 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,45 +304,43 @@ func (q *Querier) SearchRecent(ctx context.Context, req *tempopb.SearchRequest)
}

func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest) (*tempopb.SearchTagsResponse, error) {
_, err := user.ExtractOrgID(ctx)
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, errors.Wrap(err, "error extracting org id in Querier.SearchTags")
}

limit := q.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

// Virtual tags. Get these first
for _, k := range search.GetVirtualTags() {
distinctValues.Collect(k)
}

// Get results from all ingesters
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, errors.Wrap(err, "error finding ingesters in Querier.SearchTags")
}

// Get results from all ingesters
lookupResults, err := q.forGivenIngesters(ctx, replicationSet, func(client tempopb.QuerierClient) (interface{}, error) {
return client.SearchTags(ctx, req)
})
if err != nil {
return nil, errors.Wrap(err, "error querying ingesters in Querier.SearchTags")
}

// Collect only unique values
uniqueMap := map[string]struct{}{}
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagsResponse).TagNames {
uniqueMap[res] = struct{}{}
distinctValues.Collect(res)
}
}

// Extra tags
for _, k := range search.GetVirtualTags() {
uniqueMap[k] = struct{}{}
if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
}

// Final response (sorted)
resp := &tempopb.SearchTagsResponse{
TagNames: make([]string, 0, len(uniqueMap)),
}
for k := range uniqueMap {
resp.TagNames = append(resp.TagNames, k)
TagNames: distinctValues.Strings(),
}
sort.Strings(resp.TagNames)

return resp, nil
}
Expand All @@ -353,47 +351,38 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
return nil, errors.Wrap(err, "error extracting org id in Querier.SearchTagValues")
}

limit := q.limits.MaxBytesPerTagValuesQuery(userID)
distinctValues := util.NewDistinctStringCollector(limit)

// Virtual tags values. Get these first.
for _, v := range search.GetVirtualTagValues(req.TagName) {
distinctValues.Collect(v)
}

// Get results from all ingesters
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, errors.Wrap(err, "error finding ingesters in Querier.SearchTagValues")
}

// Get results from all ingesters
lookupResults, err := q.forGivenIngesters(ctx, replicationSet, func(client tempopb.QuerierClient) (interface{}, error) {
return client.SearchTagValues(ctx, req)
})
if err != nil {
return nil, errors.Wrap(err, "error querying ingesters in Querier.SearchTagValues")
}

// Collect only unique values
uniqueMap := map[string]struct{}{}
for _, resp := range lookupResults {
for _, res := range resp.response.(*tempopb.SearchTagValuesResponse).TagValues {
uniqueMap[res] = struct{}{}
distinctValues.Collect(res)
}
}

// Extra values
for _, v := range search.GetVirtualTagValues(req.TagName) {
uniqueMap[v] = struct{}{}
}

// fetch response size limit for tag-values query
tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID)
if tagValuesLimitBytes > 0 && !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", tagValuesLimitBytes)
if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
}

// Final response (sorted)
resp := &tempopb.SearchTagValuesResponse{
TagValues: make([]string, 0, len(uniqueMap)),
}
for k := range uniqueMap {
resp.TagValues = append(resp.TagValues, k)
TagValues: distinctValues.Strings(),
}
sort.Strings(resp.TagValues)

return resp, nil
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/tempofb/searchdatamap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
flatbuffers "github.com/google/flatbuffers/go"
)

type TagCallback func(t string)

type SearchDataMap map[string]map[string]struct{}

func NewSearchDataMap() SearchDataMap {
Expand Down Expand Up @@ -56,13 +58,13 @@ func (s SearchDataMap) Range(f func(k, v string)) {
}
}

func (s SearchDataMap) RangeKeys(f func(k string)) {
func (s SearchDataMap) RangeKeys(f TagCallback) {
for k := range s {
f(k)
}
}

func (s SearchDataMap) RangeKeyValues(k string, f func(v string)) {
func (s SearchDataMap) RangeKeyValues(k string, f TagCallback) {
for v := range s[k] {
f(v)
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/util/distinct_string_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package util

import "sort"

type DistinctStringCollector struct {
values map[string]struct{}
maxLen int
currLen int
totalLen int
}

// NewDistinctStringCollector with the given maximum data size. This is calculated
// as the total length of the recorded strings. For ease of use, maximum=0
// is interpreted as unlimited.
func NewDistinctStringCollector(maxDataSize int) *DistinctStringCollector {
return &DistinctStringCollector{
values: make(map[string]struct{}),
maxLen: maxDataSize,
}
}

func (d *DistinctStringCollector) Collect(s string) {
if _, ok := d.values[s]; ok {
// Already present
return
}

// New entry
d.totalLen += len(s)

// Can it fit?
if d.maxLen > 0 && d.currLen+len(s) > d.maxLen {
// No
return
}

d.values[s] = struct{}{}
d.currLen += len(s)
}

// Strings returns the final list of distinct values collected and sorted.
func (d *DistinctStringCollector) Strings() []string {
ss := make([]string, 0, len(d.values))

for k := range d.values {
ss = append(ss, k)
}

sort.Strings(ss)
return ss
}

// Exceeded indicates if some values were lost because the maximum size limit was met.
func (d *DistinctStringCollector) Exceeded() bool {
return d.totalLen > d.currLen
}

// TotalDataSize is the total size of all distinct strings encountered.
func (d *DistinctStringCollector) TotalDataSize() int {
return d.totalLen
}
Loading

0 comments on commit e1005a2

Please sign in to comment.