diff --git a/CHANGELOG.md b/CHANGELOG.md index d98622c8604..1c19400ca55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ **BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing. * [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar) * [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott) +* [ENHANCEMENT] Implement simple Fetch by key for cache items [#4032](https://github.com/grafana/tempo/pull/4032) (@javiermolinar) * [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar) * [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr) * [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar) diff --git a/example/docker-compose/local/docker-compose.yaml b/example/docker-compose/local/docker-compose.yaml index 8a6512ab916..ee8816aac9a 100644 --- a/example/docker-compose/local/docker-compose.yaml +++ b/example/docker-compose/local/docker-compose.yaml @@ -12,11 +12,20 @@ services: volumes: - ./tempo-data:/var/tempo + memcached: + image: memcached:1.6.29 + container_name: memcached + ports: + - "11211:11211" + environment: + - MEMCACHED_MAX_MEMORY=64m # Set the maximum memory usage + - MEMCACHED_THREADS=4 # Number of threads to use + tempo: image: *tempoImage command: [ "-config.file=/etc/tempo.yaml" ] volumes: - - ../shared/tempo.yaml:/etc/tempo.yaml + - ./tempo.yaml:/etc/tempo.yaml - ./tempo-data:/var/tempo ports: - "14268:14268" # jaeger ingest @@ -27,6 +36,7 @@ services: - "9411:9411" # zipkin depends_on: - init + - memcached k6-tracing: image: ghcr.io/grafana/xk6-client-tracing:v0.0.5 diff --git a/example/docker-compose/local/tempo.yaml b/example/docker-compose/local/tempo.yaml new file mode 100644 index 00000000000..9ddf6f1da1b --- /dev/null +++ b/example/docker-compose/local/tempo.yaml @@ -0,0 +1,70 @@ +stream_over_http_enabled: true +server: + http_listen_port: 3200 + log_level: info + + +cache: + background: + writeback_goroutines: 5 + caches: + - roles: + - frontend-search + memcached: + host: localhost:11211 + +query_frontend: + search: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + trace_by_id: + duration_slo: 5s + +distributor: + receivers: # this configuration will listen on all ports and protocols that tempo is capable of. + jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can + protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver + thrift_http: # + grpc: # for a production deployment you should only enable the receivers you need! + thrift_binary: + thrift_compact: + zipkin: + otlp: + protocols: + http: + grpc: + opencensus: + +ingester: + max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally + +compactor: + compaction: + block_retention: 1h # overall Tempo trace retention. set for demo purposes + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /var/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + traces_storage: + path: /var/tempo/generator/traces + +storage: + trace: + backend: local # backend configuration to use + wal: + path: /var/tempo/wal # where to store the wal locally + local: + path: /var/tempo/blocks + +overrides: + defaults: + metrics_generator: + processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator + generate_native_histograms: both diff --git a/modules/cache/memcached/memcached.go b/modules/cache/memcached/memcached.go index e442ede4c97..e4dd12ca8f2 100644 --- a/modules/cache/memcached/memcached.go +++ b/modules/cache/memcached/memcached.go @@ -28,9 +28,7 @@ func NewClient(cfg *Config, cfgBackground *cache.BackgroundConfig, name string, client := cache.NewMemcachedClient(cfg.ClientConfig, name, prometheus.DefaultRegisterer, logger) memcachedCfg := cache.MemcachedConfig{ - Expiration: cfg.TTL, - BatchSize: 0, // we are currently only requesting one key at a time, which is bad. we could restructure Find() to batch request all blooms at once - Parallelism: 0, + Expiration: cfg.TTL, } c := cache.NewMemcached(memcachedCfg, client, name, cfg.ClientConfig.MaxItemSize, prometheus.DefaultRegisterer, logger) diff --git a/modules/frontend/pipeline/sync_handler_cache.go b/modules/frontend/pipeline/sync_handler_cache.go index 0df19da1e25..77437797352 100644 --- a/modules/frontend/pipeline/sync_handler_cache.go +++ b/modules/frontend/pipeline/sync_handler_cache.go @@ -9,8 +9,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/gogo/protobuf/jsonpb" - "github.com/gogo/protobuf/proto" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/cache" ) @@ -39,7 +37,7 @@ func (c cachingWare) RoundTrip(req Request) (*http.Response, error) { // extract cache key key := req.CacheKey() if len(key) > 0 { - body := c.cache.fetchBytes(key) + body := c.cache.fetchBytes(req.Context(), key) if len(body) > 0 { resp := &http.Response{ Header: http.Header{}, @@ -145,26 +143,7 @@ func (c *frontendCache) store(ctx context.Context, key string, buffer []byte) { } // fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body. -func (c *frontendCache) fetch(key string, pb proto.Message) bool { - if c.c == nil { - return false - } - - if len(key) == 0 { - return false - } - - _, bufs, _ := c.c.Fetch(context.Background(), []string{key}) - if len(bufs) != 1 { - return false - } - - err := (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(bufs[0]), pb) - return err == nil -} - -// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body. -func (c *frontendCache) fetchBytes(key string) []byte { +func (c *frontendCache) fetchBytes(ctx context.Context, key string) []byte { if c.c == nil { return nil } @@ -173,10 +152,10 @@ func (c *frontendCache) fetchBytes(key string) []byte { return nil } - _, bufs, _ := c.c.Fetch(context.Background(), []string{key}) - if len(bufs) != 1 { + buf, found := c.c.FetchKey(ctx, key) + if !found { return nil } - return bufs[0] + return buf } diff --git a/modules/frontend/pipeline/sync_handler_cache_test.go b/modules/frontend/pipeline/sync_handler_cache_test.go index 96d53788857..7ef27c3408b 100644 --- a/modules/frontend/pipeline/sync_handler_cache_test.go +++ b/modules/frontend/pipeline/sync_handler_cache_test.go @@ -39,8 +39,9 @@ func TestCacheCaches(t *testing.T) { c.store(context.Background(), testKey, testData) actual := &tempopb.SearchTagsResponse{} - found := c.fetch(testKey, actual) + buffer := c.fetchBytes(context.Background(), testKey) + err = (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(buffer), actual) - require.True(t, found) + require.NoError(t, err) require.Equal(t, expected, actual) } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 6ec3059e308..add850e4abe 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -2,7 +2,9 @@ package cache import ( "context" + "time" + instr "github.com/grafana/dskit/instrument" "github.com/grafana/dskit/services" ) @@ -41,5 +43,14 @@ type Cache interface { // TODO: both cached backend clients support deletion. Should we implement? // Remove(ctx context.Context, key []string) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) + FetchKey(ctx context.Context, key string) (buf []byte, found bool) Stop() } + +func measureRequest(ctx context.Context, method string, col instr.Collector, toStatusCode func(error) string, f func(context.Context) error) error { + start := time.Now() + col.Before(ctx, method, start) + err := f(ctx) + col.After(ctx, method, toStatusCode(err), start) + return err +} diff --git a/pkg/cache/memcached.go b/pkg/cache/memcached.go index 99b162f941c..cfe6e010aa6 100644 --- a/pkg/cache/memcached.go +++ b/pkg/cache/memcached.go @@ -4,7 +4,6 @@ import ( "context" "errors" "flag" - "sync" "time" "github.com/go-kit/log" @@ -13,39 +12,26 @@ import ( "github.com/grafana/gomemcache/memcache" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/grafana/tempo/pkg/util/math" ) // MemcachedConfig is config to make a Memcached type MemcachedConfig struct { Expiration time.Duration `yaml:"expiration"` - - BatchSize int `yaml:"batch_size"` - Parallelism int `yaml:"parallelism"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.") - f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 1024, description+"How many keys to fetch in each batch.") - f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 100, description+"Maximum active requests to memcache.") } // Memcached type caches chunks in memcached type Memcached struct { - cfg MemcachedConfig - memcache MemcachedClient - name string - maxItemSize int - + cfg MemcachedConfig + memcache MemcachedClient + name string + maxItemSize int requestDuration *instr.HistogramCollector - - wg sync.WaitGroup - inputCh chan *work - quit chan struct{} - - logger log.Logger + logger log.Logger } // NewMemcached makes a new Memcached. @@ -70,56 +56,9 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, maxI }, []string{"method", "status_code"}), ), } - - if cfg.BatchSize == 0 || cfg.Parallelism == 0 { - return c - } - - c.inputCh = make(chan *work) - c.quit = make(chan struct{}) - c.wg.Add(cfg.Parallelism) - - for i := 0; i < cfg.Parallelism; i++ { - go func() { - defer c.wg.Done() - for { - select { - case <-c.quit: - return - case input := <-c.inputCh: - res := &result{ - batchID: input.batchID, - } - res.found, res.bufs, res.missed = c.fetch(input.ctx, input.keys) - // No-one will be reading from resultCh if we were asked to quit - // during the fetch, so check again before writing to it. - select { - case <-c.quit: - return - case input.resultCh <- res: - } - } - } - }() - } - return c } -type work struct { - keys []string - ctx context.Context - resultCh chan<- *result - batchID int // For ordering results. -} - -type result struct { - found []string - bufs [][]byte - missed []string - batchID int // For ordering results. -} - func memcacheStatusCode(err error) string { // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables if errors.Is(err, memcache.ErrCacheMiss) { @@ -136,21 +75,37 @@ func memcacheStatusCode(err error) string { // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { - if c.cfg.BatchSize == 0 { - found, bufs, missed = c.fetch(ctx, keys) - return - } - _ = measureRequest(ctx, "Memcache.GetBatched", c.requestDuration, memcacheStatusCode, func(ctx context.Context) error { - found, bufs, missed = c.fetchKeysBatched(ctx, keys) - return nil - }) + found, bufs, missed = c.fetch(ctx, keys) return } +// FetchKey gets a single key from the cache +func (c *Memcached) FetchKey(ctx context.Context, key string) (buf []byte, found bool) { + const method = "Memcache.Get" + var item *memcache.Item + err := measureRequest(ctx, method, c.requestDuration, memcacheStatusCode, func(_ context.Context) error { + var err error + item, err = c.memcache.Get(key) + if err != nil { + if errors.Is(err, memcache.ErrCacheMiss) { + level.Debug(c.logger).Log("msg", "Failed to get key from memcached", "err", err, "key", key) + } else { + level.Error(c.logger).Log("msg", "Error getting key from memcached", "err", err, "key", key) + } + } + return err + }) + if err != nil { + return buf, false + } + return item.Value, true +} + func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { var items map[string]*memcache.Item const method = "Memcache.GetMulti" - err := measureRequest(ctx, method, c.requestDuration, memcacheStatusCode, func(innerCtx context.Context) error { + + err := measureRequest(ctx, method, c.requestDuration, memcacheStatusCode, func(_ context.Context) error { var err error items, err = c.memcache.GetMulti(keys) if err != nil { @@ -161,7 +116,6 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b if err != nil { return found, bufs, keys } - for _, key := range keys { item, ok := items[key] if ok { @@ -174,57 +128,6 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b return } -func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { - resultsCh := make(chan *result) - batchSize := c.cfg.BatchSize - - go func() { - for i, j := 0, 0; i < len(keys); i += batchSize { - batchKeys := keys[i:math.Min(i+batchSize, len(keys))] - select { - case <-c.quit: - return - case c.inputCh <- &work{ - keys: batchKeys, - ctx: ctx, - resultCh: resultsCh, - batchID: j, - }: - } - j++ - } - }() - - // Read all values from this channel to avoid blocking upstream. - numResults := len(keys) / batchSize - if len(keys)%batchSize != 0 { - numResults++ - } - - // We need to order found by the input keys order. - results := make([]*result, numResults) -loopResults: - for i := 0; i < numResults; i++ { - select { - case <-c.quit: - break loopResults - case result := <-resultsCh: - results[result.batchID] = result - } - } - - for _, result := range results { - if result == nil { - continue - } - found = append(found, result.found...) - bufs = append(bufs, result.bufs...) - missed = append(missed, result.missed...) - } - - return -} - // Store stores the key in the cache. func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) { for i := range keys { @@ -242,26 +145,8 @@ func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) { } } -func measureRequest(ctx context.Context, method string, col instr.Collector, toStatusCode func(error) string, f func(context.Context) error) error { - start := time.Now() - col.Before(ctx, method, start) - err := f(ctx) - col.After(ctx, method, toStatusCode(err), start) - return err -} - -// Stop does nothing. func (c *Memcached) Stop() { - if c.quit == nil { - return - } - - select { - case <-c.quit: - default: - close(c.quit) - } - c.wg.Wait() + c.memcache.Close() } func (c *Memcached) MaxItemSize() int { diff --git a/pkg/cache/memcached_client.go b/pkg/cache/memcached_client.go index 4d1bddd07c9..a661b7d9f56 100644 --- a/pkg/cache/memcached_client.go +++ b/pkg/cache/memcached_client.go @@ -28,6 +28,8 @@ import ( type MemcachedClient interface { GetMulti(keys []string, opts ...memcache.Option) (map[string]*memcache.Item, error) Set(item *memcache.Item) error + Get(key string, opts ...memcache.Option) (*memcache.Item, error) + Close() } type serverSelector interface { diff --git a/pkg/cache/memcached_client_test.go b/pkg/cache/memcached_client_test.go index d1b2008230b..16667af42ea 100644 --- a/pkg/cache/memcached_client_test.go +++ b/pkg/cache/memcached_client_test.go @@ -37,3 +37,19 @@ func (m *mockMemcache) Set(item *memcache.Item) error { m.contents[item.Key] = item.Value return nil } + +func (m *mockMemcache) Get(key string, _ ...memcache.Option) (*memcache.Item, error) { + m.RLock() + defer m.RUnlock() + + if c, ok := m.contents[key]; ok { + return &memcache.Item{ + Value: c, + }, nil + } + + return nil, memcache.ErrCacheMiss +} + +func (m *mockMemcache) Close() { +} diff --git a/pkg/cache/memcached_test.go b/pkg/cache/memcached_test.go index 54598a6e49a..370ba21121f 100644 --- a/pkg/cache/memcached_test.go +++ b/pkg/cache/memcached_test.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/gomemcache/memcache" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -22,16 +23,6 @@ func TestMemcached(t *testing.T) { testMemcache(t, memcache) }) - - t.Run("batched", func(t *testing.T) { - client := newMockMemcache() - memcache := cache.NewMemcached(cache.MemcachedConfig{ - BatchSize: 10, - Parallelism: 5, - }, client, "test", 0, nil, log.NewNopLogger()) - - testMemcache(t, memcache) - }) } func testMemcache(t *testing.T, memcache *cache.Memcached) { @@ -67,6 +58,12 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) { found = found[1:] bufs = bufs[1:] } + + _, foundKey := memcache.FetchKey(ctx, "1") + assert.True(t, foundKey) + + _, foundKey = memcache.FetchKey(ctx, "5") + assert.False(t, foundKey) } // mockMemcache whose calls fail 1/3rd of the time. @@ -98,16 +95,6 @@ func TestMemcacheFailure(t *testing.T) { testMemcacheFailing(t, memcache) }) - - t.Run("batched", func(t *testing.T) { - client := newMockMemcacheFailing() - memcache := cache.NewMemcached(cache.MemcachedConfig{ - BatchSize: 10, - Parallelism: 5, - }, client, "test", 0, nil, log.NewNopLogger()) - - testMemcacheFailing(t, memcache) - }) } func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) { @@ -165,16 +152,6 @@ func TestMemcacheStop(t *testing.T) { testMemcachedStopping(memcache) }) - - t.Run("batched", func(_ *testing.T) { - client := newMockMemcacheFailing() - memcache := cache.NewMemcached(cache.MemcachedConfig{ - BatchSize: 10, - Parallelism: 5, - }, client, "test", 0, nil, log.NewNopLogger()) - - testMemcachedStopping(memcache) - }) } func testMemcachedStopping(memcache *cache.Memcached) { diff --git a/pkg/cache/redis_cache.go b/pkg/cache/redis_cache.go index 1f2451add4d..c5494a92d79 100644 --- a/pkg/cache/redis_cache.go +++ b/pkg/cache/redis_cache.go @@ -2,14 +2,17 @@ package cache import ( "context" + "errors" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/go-redis/redis/v8" instr "github.com/grafana/dskit/instrument" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" util_log "github.com/grafana/tempo/pkg/util/log" "github.com/grafana/tempo/pkg/util/spanlogger" @@ -64,11 +67,8 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, const method = "RedisCache.MGet" var items [][]byte // Run a tracked request, using c.requestDuration to monitor requests. - err := instr.CollectedRequest(ctx, method, c.requestDuration, redisStatusCode, func(ctx context.Context) error { - log, _ := spanlogger.New(ctx, method) - defer log.End() - log.SetAttributes(attribute.Int("keys requested", len(keys))) - + err := measureRequest(ctx, method, c.requestDuration, redisStatusCode, func(ctx context.Context) error { + log := spanlogger.FromContext(ctx) var err error items, err = c.redis.MGet(ctx, keys) if err != nil { @@ -77,9 +77,7 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, level.Error(c.logger).Log("msg", "failed to get from redis", "name", c.name, "err", err) return err } - - log.SetAttributes(attribute.Int("keys found", len(items))) - + log.AddEvent("cache.keys.found", trace.WithAttributes(attribute.Int("keys", len(keys)))) return nil }) if err != nil { @@ -98,6 +96,36 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, return } +// Fetch gets a single keys from the cache +func (c *RedisCache) FetchKey(ctx context.Context, key string) (buf []byte, found bool) { + const method = "RedisCache.Get" + // Run a tracked request, using c.requestDuration to monitor requests. + err := measureRequest(ctx, method, c.requestDuration, redisStatusCode, func(ctx context.Context) error { + log := spanlogger.FromContext(ctx) + var err error + buf, err = c.redis.Get(ctx, key) + if err != nil { + // nolint:errcheck + log.Error(err) + if errors.Is(err, redis.Nil) { + level.Debug(c.logger).Log("msg", "failed to get key from redis", "name", c.name, "err", err, "key", key) + log.AddEvent("cache.key.missed", trace.WithAttributes(attribute.String("key", key))) + } else { + level.Error(c.logger).Log("msg", "error requesting key from redis", "name", c.name, "err", err, "key", key) + } + + return err + } + log.AddEvent("cache.key.found", trace.WithAttributes(attribute.String("key", key))) + return nil + }) + if err != nil { + return buf, false + } + + return buf, true +} + // Store stores the key in the cache. func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { err := c.redis.MSet(ctx, keys, bufs) diff --git a/pkg/cache/redis_cache_test.go b/pkg/cache/redis_cache_test.go index d4490615851..59e904d94a9 100644 --- a/pkg/cache/redis_cache_test.go +++ b/pkg/cache/redis_cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/go-kit/log" "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -40,6 +41,9 @@ func TestRedisCache(t *testing.T) { require.Equal(t, bufs[i], data[i]) } + _, foundKey := c.FetchKey(ctx, "key1") + assert.True(t, foundKey) + // test misses found, _, missed = c.Fetch(ctx, miss) @@ -48,6 +52,9 @@ func TestRedisCache(t *testing.T) { for i := 0; i < nMiss; i++ { require.Equal(t, miss[i], missed[i]) } + + _, foundKey = c.FetchKey(ctx, miss[0]) + assert.False(t, foundKey) } func mockRedisCache() (*RedisCache, error) { diff --git a/pkg/cache/redis_client.go b/pkg/cache/redis_client.go index 61b2e9dda00..6a6b5dc1eca 100644 --- a/pkg/cache/redis_client.go +++ b/pkg/cache/redis_client.go @@ -159,6 +159,21 @@ func (c *RedisClient) MGet(ctx context.Context, keys []string) ([][]byte, error) return ret, nil } +func (c *RedisClient) Get(ctx context.Context, key string) ([]byte, error) { + var cancel context.CancelFunc + if c.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.timeout) + defer cancel() + } + cmd := c.rdb.Get(ctx, key) + err := cmd.Err() + if err != nil { + return nil, err + } + + return StringToBytes(cmd.Val()), nil +} + func (c *RedisClient) Close() error { return c.rdb.Close() } diff --git a/pkg/util/test/cache.go b/pkg/util/test/cache.go index 7a6ffb2243c..f63c5879a1f 100644 --- a/pkg/util/test/cache.go +++ b/pkg/util/test/cache.go @@ -36,6 +36,16 @@ func (m *mockClient) Fetch(_ context.Context, keys []string) (found []string, bu return } +func (m *mockClient) FetchKey(_ context.Context, key string) (buf []byte, found bool) { + m.Lock() + defer m.Unlock() + buf, ok := m.cache[key] + if ok { + return buf, true + } + return buf, false +} + func (m *mockClient) MaxItemSize() int { return 0 } diff --git a/tempodb/backend/cache/cache.go b/tempodb/backend/cache/cache.go index 8a9717f97cc..cd58550fe30 100644 --- a/tempodb/backend/cache/cache.go +++ b/tempodb/backend/cache/cache.go @@ -84,9 +84,9 @@ func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.Ke cache := r.cacheFor(cacheInfo) if cache != nil { k = key(keypath, name) - found, vals, _ := cache.Fetch(ctx, []string{k}) - if len(found) > 0 { - return io.NopCloser(bytes.NewReader(vals[0])), int64(len(vals[0])), nil + b, found := cache.FetchKey(ctx, k) + if found { + return io.NopCloser(bytes.NewReader(b)), int64(len(b)), nil } } @@ -115,9 +115,9 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, keypath backe keyGen := keypath keyGen = append(keyGen, strconv.Itoa(int(offset)), strconv.Itoa(len(buffer))) k = strings.Join(keyGen, ":") - found, vals, _ := cache.Fetch(ctx, []string{k}) - if len(found) > 0 { - copy(buffer, vals[0]) + b, found := cache.FetchKey(ctx, k) + if found { + copy(buffer, b) return nil } }