Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement simple Fetch by key for cache items #4032

Merged
merged 12 commits into from
Sep 10, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
**BREAKING CHANGE** Removed `querier_forget_delay` setting from the frontend. This configuration option did nothing.
* [ENHANCEMENT] Update metrics-generator config in Tempo distributed docker compose example to serve TraceQL metrics [#4003](https://github.com/grafana/tempo/pull/4003) (@javiermolinar)
* [ENHANCEMENT] Reduce allocs related to marshalling dedicated columns repeatedly in the query frontend. [#4007](https://github.com/grafana/tempo/pull/4007) (@joe-elliott)
* [ENHANCEMENT] Implement simple Fetch by key for cache items [#4032](https://github.com/grafana/tempo/pull/4032) (@javiermolinar)
* [ENHANCEMENT] Replace Grafana Agent example by Grafana Alloy[#4030](https://github.com/grafana/tempo/pull/4030) (@javiermolinar)
* [ENHANCEMENT] Support exporting internal Tempo traces via OTLP exporter when `use_otel_tracer` is enabled. Use the OpenTelemetry SDK environment variables to configure the span exporter. [#4028](https://github.com/grafana/tempo/pull/4028) (@andreasgerstmayr)
* [ENHANCEMENT] TraceQL metrics queries: add min_over_time [#3975](https://github.com/grafana/tempo/pull/3975) (@javiermolinar)
Expand Down
12 changes: 11 additions & 1 deletion example/docker-compose/local/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@ services:
volumes:
- ./tempo-data:/var/tempo

memcached:
image: memcached:1.6.29
container_name: memcached
ports:
- "11211:11211"
environment:
- MEMCACHED_MAX_MEMORY=64m # Set the maximum memory usage
- MEMCACHED_THREADS=4 # Number of threads to use

tempo:
image: *tempoImage
command: [ "-config.file=/etc/tempo.yaml" ]
volumes:
- ../shared/tempo.yaml:/etc/tempo.yaml
- ./tempo.yaml:/etc/tempo.yaml
- ./tempo-data:/var/tempo
ports:
- "14268:14268" # jaeger ingest
Expand All @@ -27,6 +36,7 @@ services:
- "9411:9411" # zipkin
depends_on:
- init
- memcached

k6-tracing:
image: ghcr.io/grafana/xk6-client-tracing:v0.0.5
Expand Down
70 changes: 70 additions & 0 deletions example/docker-compose/local/tempo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info


cache:
background:
writeback_goroutines: 5
caches:
- roles:
- frontend-search
memcached:
host: localhost:11211

query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s

distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
opencensus:

ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally

compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes

metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces

storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks

overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both
4 changes: 1 addition & 3 deletions modules/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ func NewClient(cfg *Config, cfgBackground *cache.BackgroundConfig, name string,

client := cache.NewMemcachedClient(cfg.ClientConfig, name, prometheus.DefaultRegisterer, logger)
memcachedCfg := cache.MemcachedConfig{
Expiration: cfg.TTL,
BatchSize: 0, // we are currently only requesting one key at a time, which is bad. we could restructure Find() to batch request all blooms at once
Parallelism: 0,
Expiration: cfg.TTL,
}
c := cache.NewMemcached(memcachedCfg, client, name, cfg.ClientConfig.MaxItemSize, prometheus.DefaultRegisterer, logger)

Expand Down
31 changes: 5 additions & 26 deletions modules/frontend/pipeline/sync_handler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
)
Expand Down Expand Up @@ -39,7 +37,7 @@ func (c cachingWare) RoundTrip(req Request) (*http.Response, error) {
// extract cache key
key := req.CacheKey()
if len(key) > 0 {
body := c.cache.fetchBytes(key)
body := c.cache.fetchBytes(req.Context(), key)
if len(body) > 0 {
resp := &http.Response{
Header: http.Header{},
Expand Down Expand Up @@ -145,26 +143,7 @@ func (c *frontendCache) store(ctx context.Context, key string, buffer []byte) {
}

// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body.
func (c *frontendCache) fetch(key string, pb proto.Message) bool {
if c.c == nil {
return false
}

if len(key) == 0 {
return false
}

_, bufs, _ := c.c.Fetch(context.Background(), []string{key})
if len(bufs) != 1 {
return false
}

err := (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(bufs[0]), pb)
return err == nil
}

// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body.
func (c *frontendCache) fetchBytes(key string) []byte {
func (c *frontendCache) fetchBytes(ctx context.Context, key string) []byte {
if c.c == nil {
return nil
}
Expand All @@ -173,10 +152,10 @@ func (c *frontendCache) fetchBytes(key string) []byte {
return nil
}

_, bufs, _ := c.c.Fetch(context.Background(), []string{key})
if len(bufs) != 1 {
buf, found := c.c.FetchKey(ctx, key)
if !found {
return nil
}

return bufs[0]
return buf
}
5 changes: 3 additions & 2 deletions modules/frontend/pipeline/sync_handler_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func TestCacheCaches(t *testing.T) {
c.store(context.Background(), testKey, testData)

actual := &tempopb.SearchTagsResponse{}
found := c.fetch(testKey, actual)
buffer := c.fetchBytes(context.Background(), testKey)
err = (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(buffer), actual)

require.True(t, found)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
11 changes: 11 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cache

import (
"context"
"time"

instr "github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/services"
)

Expand Down Expand Up @@ -41,5 +43,14 @@ type Cache interface {
// TODO: both cached backend clients support deletion. Should we implement?
// Remove(ctx context.Context, key []string)
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string)
FetchKey(ctx context.Context, key string) (buf []byte, found bool)
Stop()
}

func measureRequest(ctx context.Context, method string, col instr.Collector, toStatusCode func(error) string, f func(context.Context) error) error {
start := time.Now()
col.Before(ctx, method, start)
err := f(ctx)
col.After(ctx, method, toStatusCode(err), start)
return err
}
Loading