diff --git a/CHANGELOG.md b/CHANGELOG.md index 2081d7cffe2..1b08428e862 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ ## main / unreleased +* [CHANGE] Search: Add new per-tenant limit `max_bytes_per_tag_values_query` to limit the size of tag-values response. [#1068](https://github.com/grafana/tempo/pull/1068) (@annanay25) * [ENHANCEMENT] Expose `upto` parameter on hedged requests for each backend with `hedge_requests_up_to`. [#1085](https://github.com/grafana/tempo/pull/1085) (@joe-elliott) +* [ENHANCEMENT] Search: drop use of TagCache, extract tags and tag values on-demand [#1068](https://github.com/grafana/tempo/pull/1068) (@kvrhdn) * [ENHANCEMENT] Jsonnet: add `$._config.namespace` to filter by namespace in cortex metrics [#1098](https://github.com/grafana/tempo/pull/1098) (@mapno) * [ENHANCEMENT] Add middleware to compress frontend HTTP responses with gzip if requested [#1080](https://github.com/grafana/tempo/pull/1080) (@kvrhdn, @zalegrala) * [ENHANCEMENT] Allow query disablement in vulture [#1117](https://github.com/grafana/tempo/pull/1117) (@zalegrala) diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index b8d4765c536..3ac45ed5d3e 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -310,7 +310,7 @@ func (t *App) setupModuleManager() error { Ring: {Server, MemberlistKV}, Distributor: {Ring, Server, Overrides}, Ingester: {Store, Server, Overrides, MemberlistKV}, - Querier: {Store, Ring}, + Querier: {Store, Ring, Overrides}, Compactor: {Store, Server, Overrides, MemberlistKV}, SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor}, ScalableSingleBinary: {SingleBinary}, diff --git a/example/docker-compose/tempo-search/overrides.yaml b/example/docker-compose/tempo-search/overrides.yaml index 72971ca395f..b431c9006d9 100644 --- a/example/docker-compose/tempo-search/overrides.yaml +++ b/example/docker-compose/tempo-search/overrides.yaml @@ -9,4 +9,5 @@ overrides: max_global_traces_per_user: 0 max_bytes_per_trace: 50000 max_search_bytes_per_trace: 0 + max_bytes_per_tag_values_query: 5000000 block_retention: 0s diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index 15da630880c..853943b2c18 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -179,9 +179,6 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { if err != nil { level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err) } - - // periodically purge tag cache, keep tags within complete block timeout (i.e. data that is locally) - instance.PurgeExpiredSearchTags(time.Now().Add(-i.cfg.CompleteBlockTimeout)) } func (i *Ingester) flushLoop(j int) { diff --git a/modules/ingester/ingester_search.go b/modules/ingester/ingester_search.go index a9ffb7860f8..0edd1cf11e9 100644 --- a/modules/ingester/ingester_search.go +++ b/modules/ingester/ingester_search.go @@ -37,13 +37,12 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques return &tempopb.SearchTagsResponse{}, nil } - tags := inst.GetSearchTags() - - resp := &tempopb.SearchTagsResponse{ - TagNames: tags, + res, err := inst.SearchTags(ctx) + if err != nil { + return nil, err } - return resp, nil + return res, nil } func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) { @@ -56,13 +55,12 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa return &tempopb.SearchTagValuesResponse{}, nil } - vals := inst.GetSearchTagValues(req.TagName) - - resp := &tempopb.SearchTagValuesResponse{ - TagValues: vals, + res, err := inst.SearchTagValues(ctx, req.TagName) + if err != nil { + return nil, err } - return resp, nil + return res, nil } // todo(search): consolidate. this only exists so that the ingester continues to implement the tempopb.QuerierServer interface. diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 489f4cc72e4..36e8bb7ce0b 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -75,7 +75,6 @@ type instance struct { searchHeadBlock *searchStreamingBlockEntry searchAppendBlocks map[*wal.AppendBlock]*searchStreamingBlockEntry searchCompleteBlocks map[*wal.LocalBlock]*searchLocalBlockEntry - searchTagCache *search.TagCache lastBlockCut time.Time @@ -107,7 +106,6 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l * traces: map[uint32]*trace{}, searchAppendBlocks: map[*wal.AppendBlock]*searchStreamingBlockEntry{}, searchCompleteBlocks: map[*wal.LocalBlock]*searchLocalBlockEntry{}, - searchTagCache: search.NewTagCache(), instanceID: instanceID, tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID), @@ -176,10 +174,6 @@ func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte, return status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err) } - if searchData != nil { - i.RecordSearchLookupValues(searchData) - } - i.tracesMtx.Lock() defer i.tracesMtx.Unlock() diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 2f4ce02a2c9..549962f97ef 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -3,12 +3,13 @@ package ingester import ( "context" "sort" - "time" cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" + "github.com/grafana/tempo/pkg/util" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" + "github.com/weaveworks/common/user" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" @@ -93,6 +94,8 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * span.LogFields(ot_log.Event("live traces mtx acquired")) + entry := &tempofb.SearchEntry{} // buffer + for _, t := range i.traces { if sr.Quit() { return @@ -106,7 +109,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * for _, s := range t.searchData { sr.AddBytesInspected(uint64(len(s))) - entry := tempofb.SearchEntryFromBytes(s) + entry.Reset(s) if p.Matches(entry) { newResult := search.GetSearchResultFromData(entry) if result != nil { @@ -181,19 +184,187 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr } } -func (i *instance) GetSearchTags() []string { - return i.searchTagCache.GetNames() +func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, error) { + tags := map[string]struct{}{} + + kv := &tempofb.KeyValues{} + err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { + for i, ii := 0, entry.TagsLength(); i < ii; i++ { + entry.Tags(kv, i) + key := string(kv.Key()) + // check the tag is already set, this is more performant with repetitive values + if _, ok := tags[key]; !ok { + tags[key] = struct{}{} + } + } + }) + if err != nil { + return nil, err + } + + err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error { + return block.Tags(ctx, tags) + }) + if err != nil { + return nil, err + } + + return &tempopb.SearchTagsResponse{ + TagNames: extractKeys(tags), + }, nil +} + +func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) { + values := map[string]struct{}{} + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + // get limit from override + maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID) + + kv := &tempofb.KeyValues{} + tagNameBytes := []byte(tagName) + err = i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) { + kv := tempofb.FindTag(entry, kv, tagNameBytes) + if kv != nil { + for i, ii := 0, kv.ValueLength(); i < ii; i++ { + key := string(kv.Value(i)) + // check the value is already set, this is more performant with repetitive values + if _, ok := values[key]; !ok { + values[key] = struct{}{} + } + } + } + }) + if err != nil { + return nil, err + } + + // check if size of values map is within limit after scanning live traces + if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) { + level.Warn(cortex_util.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName) + // return empty response to avoid querier OOMs + return &tempopb.SearchTagValuesResponse{ + TagValues: []string{}, + }, nil + } + + err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error { + return block.TagValues(ctx, tagName, values) + }) + if err != nil { + return nil, err + } + + // check if size of values map is within limit after scanning all blocks + if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) { + level.Warn(cortex_util.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName) + // return empty response to avoid querier OOMs + return &tempopb.SearchTagValuesResponse{ + TagValues: []string{}, + }, nil + } + + return &tempopb.SearchTagValuesResponse{ + TagValues: extractKeys(values), + }, nil +} + +func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visitFn func(entry *tempofb.SearchEntry)) error { + span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchEntriesLiveTraces") + defer span.Finish() + + i.tracesMtx.Lock() + defer i.tracesMtx.Unlock() + + se := &tempofb.SearchEntry{} + for _, t := range i.traces { + for _, s := range t.searchData { + se.Reset(s) + visitFn(se) + + if err := ctx.Err(); err != nil { + return err + } + } + } + return nil +} + +func (i *instance) visitSearchableBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error { + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() + + err := i.visitSearchableBlocksWAL(ctx, visitFn) + if err != nil { + return err + } + + return i.visitSearchableBlocksLocalBlocks(ctx, visitFn) } -func (i *instance) GetSearchTagValues(tagName string) []string { - return i.searchTagCache.GetValues(tagName) +// visitSearchableBlocksWAL visits every WAL block. Must be called under lock. +func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visitFn func(block search.SearchableBlock) error) error { + span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL") + defer span.Finish() + + visitUnderLock := func(entry *searchStreamingBlockEntry) error { + entry.mtx.RLock() + defer entry.mtx.RUnlock() + + return visitFn(entry.b) + } + + err := visitUnderLock(i.searchHeadBlock) + if err != nil { + return err + } + if err := ctx.Err(); err != nil { + return err + } + + for _, b := range i.searchAppendBlocks { + err := visitUnderLock(b) + if err != nil { + return err + } + if err := ctx.Err(); err != nil { + return err + } + } + return nil } -func (i *instance) RecordSearchLookupValues(b []byte) { - s := tempofb.SearchEntryFromBytes(b) - i.searchTagCache.SetData(time.Now(), s) +// visitSearchableBlocksWAL visits every local block. Must be called under lock. +func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error { + span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks") + defer span.Finish() + + visitUnderLock := func(entry *searchLocalBlockEntry) error { + entry.mtx.RLock() + defer entry.mtx.RUnlock() + + return visitFn(entry.b) + } + + for _, b := range i.searchCompleteBlocks { + err := visitUnderLock(b) + if err != nil { + return err + } + if err := ctx.Err(); err != nil { + return err + } + } + return nil } -func (i *instance) PurgeExpiredSearchTags(before time.Time) { - i.searchTagCache.PurgeExpired(before) +func extractKeys(set map[string]struct{}) []string { + keys := make([]string, 0, len(set)) + for k := range set { + keys = append(keys, k) + } + return keys } diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index f7d164e1813..11e4c2852fd 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -10,6 +10,11 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/atomic" + "github.com/weaveworks/common/user" + "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempofb" @@ -17,9 +22,6 @@ import ( "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/search" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber-go/atomic" ) func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) { @@ -247,11 +249,15 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { }) go concurrent(func() { - i.GetSearchTags() + _, err := i.SearchTags(context.Background()) + require.NoError(t, err, "error getting search tags") }) go concurrent(func() { - i.GetSearchTagValues(tagKey) + // SearchTagValues queries now require userID in ctx + ctx := user.InjectOrgID(context.Background(), "test") + _, err := i.SearchTagValues(ctx, tagKey) + require.NoError(t, err, "error getting search tag values") }) time.Sleep(2000 * time.Millisecond) diff --git a/modules/overrides/limits.go b/modules/overrides/limits.go index dd298dfcd7d..03b865353d5 100644 --- a/modules/overrides/limits.go +++ b/modules/overrides/limits.go @@ -21,13 +21,14 @@ const ( ErrorPrefixRateLimited = "RATE_LIMITED:" // metrics - MetricMaxLocalTracesPerUser = "max_local_traces_per_user" - MetricMaxGlobalTracesPerUser = "max_global_traces_per_user" - MetricMaxBytesPerTrace = "max_bytes_per_trace" - MetricMaxSearchBytesPerTrace = "max_search_bytes_per_trace" - MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes" - MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes" - MetricBlockRetention = "block_retention" + MetricMaxLocalTracesPerUser = "max_local_traces_per_user" + MetricMaxGlobalTracesPerUser = "max_global_traces_per_user" + MetricMaxBytesPerTrace = "max_bytes_per_trace" + MetricMaxSearchBytesPerTrace = "max_search_bytes_per_trace" + MetricMaxBytesPerTagValuesQuery = "max_bytes_per_tag_values_query" + MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes" + MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes" + MetricBlockRetention = "block_retention" ) var ( @@ -57,6 +58,9 @@ type Limits struct { // Compactor enforced limits. BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"` + // Querier enforced limits. + MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"` + // Configuration for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"` @@ -75,6 +79,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxBytesPerTrace, "ingester.max-bytes-per-trace", 50e5, "Maximum size of a trace in bytes. 0 to disable.") f.IntVar(&l.MaxSearchBytesPerTrace, "ingester.max-search-bytes-per-trace", 50e3, "Maximum size of search data per trace in bytes. 0 to disable.") + // Querier limits + f.IntVar(&l.MaxBytesPerTagValuesQuery, "querier.max-bytes-per-tag-values-query", 50e5, "Maximum size of response for a tag-values query. Used mainly to limit large the number of values associated with a particular tag") + f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") _ = l.PerTenantOverridePeriod.Set("10s") f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") @@ -89,6 +96,7 @@ func (l *Limits) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxGlobalTracesPerUser), MetricMaxGlobalTracesPerUser) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTrace), MetricMaxBytesPerTrace) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxSearchBytesPerTrace), MetricMaxSearchBytesPerTrace) + ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTagValuesQuery), MetricMaxBytesPerTagValuesQuery) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionRateLimitBytes), MetricIngestionRateLimitBytes) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionBurstSizeBytes), MetricIngestionBurstSizeBytes) ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.BlockRetention), MetricBlockRetention) diff --git a/modules/overrides/overrides.go b/modules/overrides/overrides.go index f6f463214ca..4970ec3cde5 100644 --- a/modules/overrides/overrides.go +++ b/modules/overrides/overrides.go @@ -242,6 +242,11 @@ func (o *Overrides) MaxSearchBytesPerTrace(userID string) int { return o.getOverridesForUser(userID).MaxSearchBytesPerTrace } +// MaxBytesPerTagValuesQuery returns the maximum size of a response to a tag-values query allowed for a user. +func (o *Overrides) MaxBytesPerTagValuesQuery(userID string) int { + return o.getOverridesForUser(userID).MaxBytesPerTagValuesQuery +} + // IngestionRateLimitBytes is the number of spans per second allowed for this tenant. func (o *Overrides) IngestionRateLimitBytes(userID string) float64 { return float64(o.getOverridesForUser(userID).IngestionRateLimitBytes) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 0d1eb1e7d4a..95a9fa49004 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/tempo/modules/storage" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/validation" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/search" @@ -329,11 +330,14 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest } func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) { - _, err := user.ExtractOrgID(ctx) + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, errors.Wrap(err, "error extracting org id in Querier.SearchTagValues") } + // fetch response size limit for tag-values query + tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID) + replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read) if err != nil { return nil, errors.Wrap(err, "error finding ingesters in Querier.SearchTagValues") @@ -360,6 +364,12 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal uniqueMap[v] = struct{}{} } + if !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) { + return &tempopb.SearchTagValuesResponse{ + TagValues: []string{}, + }, nil + } + // Final response (sorted) resp := &tempopb.SearchTagValuesResponse{ TagValues: make([]string, 0, len(uniqueMap)), diff --git a/pkg/tempofb/searchdata_test.go b/pkg/tempofb/searchdata_test.go index e49a23a3e77..3f521b9e390 100644 --- a/pkg/tempofb/searchdata_test.go +++ b/pkg/tempofb/searchdata_test.go @@ -91,3 +91,34 @@ func TestEncodingSize(t *testing.T) { fmt.Printf(" - Tag: %.1f bytes after\n", float32(tagValueLongTermTags-tagValueBaseLine)/float32(delta)) fmt.Printf(" - Value: %.1f bytes after\n", float32(tagValueLongTermValues-tagValueBaseLine)/float32(delta)) } + +func TestContainsTag(t *testing.T) { + m := &SearchEntryMutable{} + m.AddTag("key1", "value") + m.AddTag("key2", "value") + m.AddTag("key3", "value") + m.AddTag("key4", "value") + m.AddTag("key5", "value") + m.AddTag("key6", "value") + + e := NewSearchEntryFromBytes(m.ToBytes()) + + kv := &KeyValues{} + + testCases := []struct { + key, value string + found bool + }{ + {"key1", "value", true}, + {"key1", "value2", false}, + {"key6", "value", true}, + {"key0", "value", false}, + {"key10", "value", false}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprint(tc.key, "=", tc.value), func(t *testing.T) { + require.Equal(t, tc.found, ContainsTag(e, kv, []byte(tc.key), []byte(tc.value))) + }) + } +} diff --git a/pkg/tempofb/searchdata_util.go b/pkg/tempofb/searchdata_util.go index af242586c1e..0591c74a162 100644 --- a/pkg/tempofb/searchdata_util.go +++ b/pkg/tempofb/searchdata_util.go @@ -2,7 +2,6 @@ package tempofb import ( "bytes" - "sort" flatbuffers "github.com/google/flatbuffers/go" "github.com/grafana/tempo/tempodb/encoding/common" @@ -146,7 +145,12 @@ func (s *SearchEntry) Contains(k []byte, v []byte, buffer *KeyValues) bool { return ContainsTag(s, buffer, k, v) } -func SearchEntryFromBytes(b []byte) *SearchEntry { +func (s *SearchEntry) Reset(b []byte) { + n := flatbuffers.GetUOffsetT(b) + s.Init(b, n) +} + +func NewSearchEntryFromBytes(b []byte) *SearchEntry { return GetRootAsSearchEntry(b, 0) } @@ -157,26 +161,8 @@ type FBTagContainer interface { func ContainsTag(s FBTagContainer, kv *KeyValues, k []byte, v []byte) bool { - matched := -1 - - // Binary search for keys. Flatbuffers are written backwards so - // keys are descending (the comparison is reversed). - // TODO - We only want exact matches, sort.Search has to make an - // extra comparison. We should fork it to make use of the full - // tri-state response from bytes.Compare - sort.Search(s.TagsLength(), func(i int) bool { - s.Tags(kv, i) - comparison := bytes.Compare(k, kv.Key()) - if comparison == 0 { - matched = i - // TODO it'd be great to exit here and retain the data in kv buffer - } - return comparison >= 0 - }) - - if matched >= 0 { - s.Tags(kv, matched) - + kv = FindTag(s, kv, k) + if kv != nil { // Linear search for matching values l := kv.ValueLength() for j := 0; j < l; j++ { @@ -188,3 +174,42 @@ func ContainsTag(s FBTagContainer, kv *KeyValues, k []byte, v []byte) bool { return false } + +func FindTag(s FBTagContainer, kv *KeyValues, k []byte) *KeyValues { + + idx := binarySearch(s.TagsLength(), func(i int) int { + s.Tags(kv, i) + // Note comparison here is backwards because KeyValues are written to flatbuffers in reverse order. + return bytes.Compare(kv.Key(), k) + }) + + if idx >= 0 { + // Data is left in buffer when matched + return kv + } + + return nil +} + +// binarySearch that finds exact matching entry. Returns non-zero index when found, or -1 when not found +// Inspired by sort.Search but makes uses of tri-state comparator to eliminate the last comparison when +// we want to find exact match, not insertion point. +func binarySearch(n int, compare func(int) int) int { + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) // avoid overflow when computing h + // i ≤ h < j + switch compare(h) { + case 0: + // Found exact match + return h + case -1: + j = h + case 1: + i = h + 1 + } + } + + // No match + return -1 +} diff --git a/pkg/tempofb/searchdatamap.go b/pkg/tempofb/searchdatamap.go index 0f5f09adde4..cff823b2376 100644 --- a/pkg/tempofb/searchdatamap.go +++ b/pkg/tempofb/searchdatamap.go @@ -12,6 +12,8 @@ type SearchDataMap interface { Contains(k, v string) bool WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT Range(f func(k, v string)) + RangeKeys(f func(k string)) + RangeKeyValues(k string, f func(v string)) } func NewSearchDataMap() SearchDataMap { @@ -70,6 +72,18 @@ func (s SearchDataMapSmall) Range(f func(k, v string)) { } } +func (s SearchDataMapSmall) RangeKeys(f func(k string)) { + for k := range s { + f(k) + } +} + +func (s SearchDataMapSmall) RangeKeyValues(k string, f func(v string)) { + for _, v := range s[k] { + f(v) + } +} + func (s SearchDataMapSmall) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT { keys := make([]string, 0, len(s)) for k := range s { @@ -115,6 +129,18 @@ func (s SearchDataMapLarge) Range(f func(k, v string)) { } } +func (s SearchDataMapLarge) RangeKeys(f func(k string)) { + for k := range s { + f(k) + } +} + +func (s SearchDataMapLarge) RangeKeyValues(k string, f func(v string)) { + for v := range s[k] { + f(v) + } +} + func (s SearchDataMapLarge) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT { keys := make([]string, 0, len(s)) for k := range s { diff --git a/pkg/tempofb/searchdatamap_test.go b/pkg/tempofb/searchdatamap_test.go index 63c33feb5ef..c6a3d23323d 100644 --- a/pkg/tempofb/searchdatamap_test.go +++ b/pkg/tempofb/searchdatamap_test.go @@ -3,8 +3,71 @@ package tempofb import ( "fmt" "testing" + + "github.com/stretchr/testify/assert" ) +func TestSearchDataMap(t *testing.T) { + testCases := []struct { + name string + impl SearchDataMap + }{ + {"SearchDataMapSmall", &SearchDataMapSmall{}}, + {"SearchDataMapLarge", &SearchDataMapLarge{}}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + searchDataMap := tc.impl + + assert.False(t, searchDataMap.Contains("key-1", "value-1-2")) + + searchDataMap.Add("key-1", "value-1-1") + + assert.False(t, searchDataMap.Contains("key-1", "value-1-2")) + + searchDataMap.Add("key-1", "value-1-2") + searchDataMap.Add("key-2", "value-2-1") + + assert.True(t, searchDataMap.Contains("key-1", "value-1-2")) + assert.False(t, searchDataMap.Contains("key-2", "value-1-2")) + + type Pair struct { + k string + v string + } + var pairs []Pair + capturePairFn := func(k, v string) { + pairs = append(pairs, Pair{k, v}) + } + + searchDataMap.Range(capturePairFn) + assert.ElementsMatch(t, []Pair{{"key-1", "value-1-1"}, {"key-1", "value-1-2"}, {"key-2", "value-2-1"}}, pairs) + + var strs []string + captureSliceFn := func(value string) { + strs = append(strs, value) + } + + searchDataMap.RangeKeys(captureSliceFn) + assert.ElementsMatch(t, []string{"key-1", "key-2"}, strs) + strs = nil + + searchDataMap.RangeKeyValues("key-1", captureSliceFn) + assert.ElementsMatch(t, []string{"value-1-1", "value-1-2"}, strs) + strs = nil + + searchDataMap.RangeKeyValues("key-2", captureSliceFn) + assert.ElementsMatch(t, []string{"value-2-1"}, strs) + strs = nil + + searchDataMap.RangeKeyValues("does-not-exist", captureSliceFn) + assert.ElementsMatch(t, []string{}, strs) + strs = nil + }) + } +} + func BenchmarkSearchDataMapAdd(b *testing.B) { intfs := []struct { name string diff --git a/pkg/util/map_size.go b/pkg/util/map_size.go new file mode 100644 index 00000000000..2fea9c15feb --- /dev/null +++ b/pkg/util/map_size.go @@ -0,0 +1,11 @@ +package util + +// MapSizeWithinLimit evaluates the total size of all keys in the map against the limit +func MapSizeWithinLimit(uniqueMap map[string]struct{}, limit int) bool { + var mapSize int + for key := range uniqueMap { + mapSize += len(key) + } + + return mapSize < limit +} diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index fbd4230bd34..2ec51f3e9a6 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -28,7 +28,8 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, block var err error ctx := context.TODO() indexPageSize := 100 * 1024 - kv := &tempofb.KeyValues{} // buffer + kv := &tempofb.KeyValues{} // buffer + s := &tempofb.SearchEntry{} // buffer // Pinning specific version instead of latest for safety version, err := encoding.FromVersion("v2") @@ -69,7 +70,7 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, block continue } - s := tempofb.SearchEntryFromBytes(data) + s.Reset(data) header.AddEntry(s) @@ -140,6 +141,44 @@ func (s *BackendSearchBlock) BlockID() uuid.UUID { return s.id } +func (s *BackendSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { + header, err := s.readSearchHeader(ctx) + if err != nil { + return err + } + + kv := &tempofb.KeyValues{} + for i, ii := 0, header.TagsLength(); i < ii; i++ { + header.Tags(kv, i) + key := string(kv.Key()) + // check the tag is already set, this is more performant with repetitive values + if _, ok := tags[key]; !ok { + tags[key] = struct{}{} + } + } + + return nil +} + +func (s *BackendSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { + header, err := s.readSearchHeader(ctx) + if err != nil { + return err + } + + kv := tempofb.FindTag(header, &tempofb.KeyValues{}, []byte(tagName)) + if kv != nil { + for j, valueLength := 0, kv.ValueLength(); j < valueLength; j++ { + value := string(kv.Value(j)) + // check the value is already set, this is more performant with repetitive values + if _, ok := tagValues[value]; !ok { + tagValues[value] = struct{}{} + } + } + } + return nil +} + // Search iterates through the block looking for matches. func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { var pageBuf []byte @@ -256,3 +295,11 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results return nil } + +func (s *BackendSearchBlock) readSearchHeader(ctx context.Context) (*tempofb.SearchBlockHeader, error) { + hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) + if err != nil { + return nil, err + } + return tempofb.GetRootAsSearchBlockHeader(hb, 0), nil +} diff --git a/tempodb/search/data_combiner.go b/tempodb/search/data_combiner.go index 6fa28ac3af9..dd2cacf2aa0 100644 --- a/tempodb/search/data_combiner.go +++ b/tempodb/search/data_combiner.go @@ -23,13 +23,14 @@ func (*DataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool, erro // Squash all datas into 1 data := tempofb.SearchEntryMutable{} - kv := &tempofb.KeyValues{} // buffer + kv := &tempofb.KeyValues{} // buffer + sd := &tempofb.SearchEntry{} // buffer for _, sb := range searchData { // we append zero-length entries to the WAL even when search is disabled. skipping to prevent unmarshalling and panik :) if len(sb) == 0 { continue } - sd := tempofb.SearchEntryFromBytes(sb) + sd.Reset(sb) for i, ii := 0, sd.TagsLength(); i < ii; i++ { sd.Tags(kv, i) for j, jj := 0, kv.ValueLength(); j < jj; j++ { diff --git a/tempodb/search/pipeline_test.go b/tempodb/search/pipeline_test.go index e40f0580f46..e2d2f047085 100644 --- a/tempodb/search/pipeline_test.go +++ b/tempodb/search/pipeline_test.go @@ -70,7 +70,7 @@ func TestPipelineMatchesTags(t *testing.T) { data := tempofb.SearchEntryMutable{ Tags: tempofb.NewSearchDataMapWithData(tc.searchData), } - sd := tempofb.SearchEntryFromBytes(data.ToBytes()) + sd := tempofb.NewSearchEntryFromBytes(data.ToBytes()) matches := p.Matches(sd) require.Equal(t, tc.shouldMatch, matches) @@ -140,7 +140,7 @@ func TestPipelineMatchesTraceDuration(t *testing.T) { StartTimeUnixNano: uint64(tc.spanStart), EndTimeUnixNano: uint64(tc.spanEnd), } - sd := tempofb.SearchEntryFromBytes(data.ToBytes()) + sd := tempofb.NewSearchEntryFromBytes(data.ToBytes()) matches := p.Matches(sd) require.Equal(t, tc.shouldMatch, matches) @@ -200,7 +200,7 @@ func TestPipelineMatchesBlock(t *testing.T) { func BenchmarkPipelineMatches(b *testing.B) { - entry := tempofb.SearchEntryFromBytes((&tempofb.SearchEntryMutable{ + entry := tempofb.NewSearchEntryFromBytes((&tempofb.SearchEntryMutable{ StartTimeUnixNano: 0, EndTimeUnixNano: uint64(500 * time.Millisecond / time.Nanosecond), //500ms in nanoseconds Tags: tempofb.NewSearchDataMapWithData(map[string][]string{ diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go index a6a19cb391f..8a331c04959 100644 --- a/tempodb/search/rescan_blocks.go +++ b/tempodb/search/rescan_blocks.go @@ -88,7 +88,7 @@ func newStreamingSearchBlockFromWALReplay(searchFilepath, filename string) (*Str blockHeader := tempofb.NewSearchBlockHeaderMutable() records, warning, err := wal.ReplayWALAndGetRecords(f, v, enc, func(bytes []byte) error { - entry := tempofb.SearchEntryFromBytes(bytes) + entry := tempofb.NewSearchEntryFromBytes(bytes) blockHeader.AddEntry(entry) return nil }) diff --git a/tempodb/search/searchable_block.go b/tempodb/search/searchable_block.go index f8aedbca0cd..110e3426bfe 100644 --- a/tempodb/search/searchable_block.go +++ b/tempodb/search/searchable_block.go @@ -5,6 +5,8 @@ import ( ) type SearchableBlock interface { + Tags(ctx context.Context, tags map[string]struct{}) error + TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error Search(ctx context.Context, p Pipeline, sr *Results) error } diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 96a389cfbbc..ffdaae00525 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -87,14 +87,36 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD } s.headerMtx.Lock() - s.header.AddEntry(tempofb.SearchEntryFromBytes(combined)) + s.header.AddEntry(tempofb.NewSearchEntryFromBytes(combined)) s.headerMtx.Unlock() return s.appender.Append(id, combined) } +func (s *StreamingSearchBlock) Tags(ctx context.Context, tags map[string]struct{}) error { + s.header.Tags.RangeKeys(func(k string) { + // check the tag is already set, this is more performant with repetitive values + if _, ok := tags[k]; !ok { + tags[k] = struct{}{} + } + }) + return nil +} + +func (s *StreamingSearchBlock) TagValues(ctx context.Context, tagName string, tagValues map[string]struct{}) error { + s.header.Tags.RangeKeyValues(tagName, func(v string) { + // check the value is already set, this is more performant with repetitive values + if _, ok := tagValues[v]; !ok { + tagValues[v] = struct{}{} + } + }) + return nil +} + // Search the streaming block. func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { + entry := &tempofb.SearchEntry{} + if s.closed.Load() { // Generally this means block has already been deleted return nil @@ -133,7 +155,7 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul sr.AddBytesInspected(uint64(len(obj))) sr.AddTraceInspected(1) - entry := tempofb.SearchEntryFromBytes(obj) + entry.Reset(obj) if !p.Matches(entry) { continue diff --git a/tempodb/search/tag_cache.go b/tempodb/search/tag_cache.go deleted file mode 100644 index 423204c8369..00000000000 --- a/tempodb/search/tag_cache.go +++ /dev/null @@ -1,120 +0,0 @@ -package search - -import ( - "math" - "sort" - "sync" - "time" - - "github.com/grafana/tempo/pkg/tempofb" -) - -type CacheEntry struct { - values map[string]int64 // value -> unix timestamp -} - -const maxValuesPerTag = 50 - -type TagCache struct { - lookups map[string]*CacheEntry - mtx sync.RWMutex -} - -func NewTagCache() *TagCache { - return &TagCache{ - lookups: map[string]*CacheEntry{}, - } -} - -func (s *TagCache) GetNames() []string { - s.mtx.RLock() - tags := make([]string, 0, len(s.lookups)) - for k := range s.lookups { - tags = append(tags, k) - } - s.mtx.RUnlock() - - sort.Strings(tags) - return tags -} - -func (s *TagCache) GetValues(tagName string) []string { - var vals []string - - s.mtx.RLock() - if e := s.lookups[tagName]; e != nil { - vals = make([]string, 0, len(e.values)) - for v := range e.values { - vals = append(vals, v) - } - } - s.mtx.RUnlock() - - sort.Strings(vals) - return vals -} - -func (s *TagCache) SetData(ts time.Time, data *tempofb.SearchEntry) { - s.mtx.Lock() - defer s.mtx.Unlock() - - tsUnix := ts.Unix() - kv := &tempofb.KeyValues{} - - l := data.TagsLength() - for j := 0; j < l; j++ { - data.Tags(kv, j) - key := string(kv.Key()) - l2 := kv.ValueLength() - for k := 0; k < l2; k++ { - s.setEntry(tsUnix, key, string(kv.Value(k))) - } - } -} - -// setEntry should be called under lock. -func (s *TagCache) setEntry(ts int64, k, v string) { - e := s.lookups[k] - if e == nil { - // First entry - s.lookups[k] = &CacheEntry{values: map[string]int64{v: ts}} - return - } - - // Prune oldest as needed - for len(e.values) >= maxValuesPerTag { - earliestv := "" - earliestts := int64(math.MaxInt64) - - for v, ts := range e.values { - if ts < earliestts { - earliestv = v - earliestts = ts - } - } - - delete(e.values, earliestv) - } - - e.values[v] = ts -} - -func (s *TagCache) PurgeExpired(before time.Time) { - s.mtx.Lock() - defer s.mtx.Unlock() - - beforeUnix := before.Unix() - - for k, e := range s.lookups { - for v, ts := range e.values { - if ts < beforeUnix { - delete(e.values, v) - } - } - - // Remove tags when all values deleted - if len(e.values) <= 0 { - delete(s.lookups, k) - } - } -} diff --git a/tempodb/search/tag_cache_test.go b/tempodb/search/tag_cache_test.go deleted file mode 100644 index 913a5e6cdca..00000000000 --- a/tempodb/search/tag_cache_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package search - -import ( - "fmt" - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestSearchTagCacheGetNames(t *testing.T) { - c := NewTagCache() - c.setEntry(0, "k1", "v1") - c.setEntry(0, "k1", "v2") - require.Equal(t, []string{"k1"}, c.GetNames()) -} - -func TestSearchTagCacheMaxValuesPerTag(t *testing.T) { - c := NewTagCache() - - for i := 0; i < maxValuesPerTag+1; i++ { - c.setEntry(int64(i), "k", fmt.Sprintf("v%02d", i)) - } - - vals := c.GetValues("k") - - require.Len(t, vals, maxValuesPerTag) - require.Equal(t, "v01", vals[0]) // oldest v0 was evicted - require.Equal(t, fmt.Sprintf("v%02d", maxValuesPerTag), vals[maxValuesPerTag-1]) -} - -func TestSearchTagCachePurge(t *testing.T) { - c := NewTagCache() - - oneMinuteAgo := time.Now().Add(-1 * time.Minute) - twoMinutesAgo := time.Now().Add(-2 * time.Minute) - - c.setEntry(twoMinutesAgo.Unix(), "j", "a") - c.setEntry(twoMinutesAgo.Unix(), "k", "a") - c.setEntry(oneMinuteAgo.Unix(), "k", "b") - - c.PurgeExpired(oneMinuteAgo) - - require.Equal(t, []string{"k"}, c.GetNames()) // Empty tags purged - require.Equal(t, []string{"b"}, c.GetValues("k")) // Old values purged -} - -func BenchmarkSearchTagCacheSetEntry(b *testing.B) { - c := NewTagCache() - - for i := 0; i < b.N; i++ { - c.setEntry(int64(i), "k", strconv.Itoa(b.N)) - } -}