Skip to content

Commit

Permalink
feat: implement simple Fetch by key for cache items (#4032)
Browse files Browse the repository at this point in the history
* implement simple cache GET

* make better errors and add cache to tempo local

* improve memcache logs

* delete unused method

* changelog

* use new method in other places

* use otel tracing

* log missed and hitted keys

* remove batched requests

* remove span loggers from memcached
  • Loading branch information
javiermolinar authored Sep 10, 2024
1 parent 4013355 commit e3f8096
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 222 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
**BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing.
* [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar)
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)
* [ENHANCEMENT] Implement simple Fetch by key for cache items [#4032](https://github.com/grafana/tempo/pull/4032) (@javiermolinar)
* [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
* [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar)
Expand Down
12 changes: 11 additions & 1 deletion example/docker-compose/local/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@ services:
volumes:
- ./tempo-data:/var/tempo

memcached:
image: memcached:1.6.29
container_name: memcached
ports:
- "11211:11211"
environment:
- MEMCACHED_MAX_MEMORY=64m # Set the maximum memory usage
- MEMCACHED_THREADS=4 # Number of threads to use

tempo:
image: *tempoImage
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ../shared/tempo.yaml:/etc/tempo.yaml
- ./tempo.yaml:/etc/tempo.yaml
- ./tempo-data:/var/tempo
ports:
- "14268:14268" # jaeger ingest
Expand All @@ -27,6 +36,7 @@ services:
- "9411:9411" # zipkin
depends_on:
- init
- memcached

k6-tracing:
image: ghcr.io/grafana/xk6-client-tracing:v0.0.5
Expand Down
70 changes: 70 additions & 0 deletions example/docker-compose/local/tempo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info


cache:
background:
writeback_goroutines: 5
caches:
- roles:
- frontend-search
memcached:
host: localhost:11211

query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s

distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
opencensus:

ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally

compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes

metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces

storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks

overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both
4 changes: 1 addition & 3 deletions modules/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ func NewClient(cfg *Config, cfgBackground *cache.BackgroundConfig, name string,

client := cache.NewMemcachedClient(cfg.ClientConfig, name, prometheus.DefaultRegisterer, logger)
memcachedCfg := cache.MemcachedConfig{
Expiration: cfg.TTL,
BatchSize: 0, // we are currently only requesting one key at a time, which is bad. we could restructure Find() to batch request all blooms at once
Parallelism: 0,
Expiration: cfg.TTL,
}
c := cache.NewMemcached(memcachedCfg, client, name, cfg.ClientConfig.MaxItemSize, prometheus.DefaultRegisterer, logger)

Expand Down
31 changes: 5 additions & 26 deletions modules/frontend/pipeline/sync_handler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
)
Expand Down Expand Up @@ -39,7 +37,7 @@ func (c cachingWare) RoundTrip(req Request) (*http.Response, error) {
// extract cache key
key := req.CacheKey()
if len(key) > 0 {
body := c.cache.fetchBytes(key)
body := c.cache.fetchBytes(req.Context(), key)
if len(body) > 0 {
resp := &http.Response{
Header: http.Header{},
Expand Down Expand Up @@ -145,26 +143,7 @@ func (c *frontendCache) store(ctx context.Context, key string, buffer []byte) {
}

// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body.
func (c *frontendCache) fetch(key string, pb proto.Message) bool {
if c.c == nil {
return false
}

if len(key) == 0 {
return false
}

_, bufs, _ := c.c.Fetch(context.Background(), []string{key})
if len(bufs) != 1 {
return false
}

err := (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(bufs[0]), pb)
return err == nil
}

// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body.
func (c *frontendCache) fetchBytes(key string) []byte {
func (c *frontendCache) fetchBytes(ctx context.Context, key string) []byte {
if c.c == nil {
return nil
}
Expand All @@ -173,10 +152,10 @@ func (c *frontendCache) fetchBytes(key string) []byte {
return nil
}

_, bufs, _ := c.c.Fetch(context.Background(), []string{key})
if len(bufs) != 1 {
buf, found := c.c.FetchKey(ctx, key)
if !found {
return nil
}

return bufs[0]
return buf
}
5 changes: 3 additions & 2 deletions modules/frontend/pipeline/sync_handler_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func TestCacheCaches(t *testing.T) {
c.store(context.Background(), testKey, testData)

actual := &tempopb.SearchTagsResponse{}
found := c.fetch(testKey, actual)
buffer := c.fetchBytes(context.Background(), testKey)
err = (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(buffer), actual)

require.True(t, found)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
11 changes: 11 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cache

import (
"context"
"time"

instr "github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/services"
)

Expand Down Expand Up @@ -41,5 +43,14 @@ type Cache interface {
// TODO: both cached backend clients support deletion. Should we implement?
// Remove(ctx context.Context, key []string)
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string)
FetchKey(ctx context.Context, key string) (buf []byte, found bool)
Stop()
}

func measureRequest(ctx context.Context, method string, col instr.Collector, toStatusCode func(error) string, f func(context.Context) error) error {
start := time.Now()
col.Before(ctx, method, start)
err := f(ctx)
col.After(ctx, method, toStatusCode(err), start)
return err
}
Loading

0 comments on commit e3f8096

Please sign in to comment.