Skip to content

Commit

Permalink
Check cached postings TTL before returning from cache
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Jan 17, 2025
1 parent c9e2f54 commit b826f27
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,12 @@ type postingsForMatcherPromise struct {
// callers contexts get canceled.
callersCtxTracker *contextsTracker

// Keep track of the time this promise was evaluated, so we can understand the age of this cache entry in traces.
evaluatedAt time.Time
// Keep track of the time this promise started being evaluated, so we can understand the age of this cache entry in traces.
evaluationStartedAt time.Time

// Keep track of the time this promise completed evaluation.
// Do not access these fields until the done channel is closed.
evaluationCompletedAt time.Time

// The result of the promise is stored either in cloner or err (only of the two is valued).
// Do not access these fields until the done channel is closed.
Expand Down Expand Up @@ -174,9 +178,9 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex
promiseCallersCtxTracker, promiseExecCtx := newContextsTracker()
ts := c.timeNow()
promise := &postingsForMatcherPromise{
evaluatedAt: ts,
done: make(chan struct{}),
callersCtxTracker: promiseCallersCtxTracker,
evaluationStartedAt: ts,
done: make(chan struct{}),
callersCtxTracker: promiseCallersCtxTracker,
}

// Add the caller context to the ones tracked by the new promise.
Expand All @@ -200,6 +204,26 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex

oldPromise := oldPromiseValue.(*postingsForMatcherPromise)

// Check if the promise already completed the execution and its TTL has not expired yet.
// If the TTL has expired, we don't want to return it, so we just recompute the postings
// on-the-fly, bypassing the cache logic. It's less performant, but more accurate, because
// avoids returning stale data.
if c.ttl > 0 {
select {
case <-oldPromise.done:
if c.timeNow().Sub(oldPromise.evaluationCompletedAt) >= c.ttl {
// The cached promise already expired, but it has not been evicted.
// TODO trace + metric
return func(ctx context.Context) (index.Postings, error) {
return c.postingsForMatchers(ctx, ix, ms...)
}
}

default:
// The evaluation is still in-flight. We wait for it.
}
}

// Add the caller context to the ones tracked by the old promise (currently in-flight).
if err := oldPromise.callersCtxTracker.add(ctx); err != nil && errors.Is(err, errContextsTrackerCanceled{}) {
// We've hit a race condition happening when the "loaded" promise execution was just canceled,
Expand All @@ -218,15 +242,15 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex

span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes(
attribute.String("cache_key", key),
attribute.Int64("cache_entry_evaluated_at", promise.evaluatedAt.Unix()),
attribute.Int64("cache_entry_evaluated_at", promise.evaluationStartedAt.Unix()),
))

return oldPromise.result
}

span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes(
attribute.String("cache_key", key),
attribute.Int64("cache_entry_evaluated_at", promise.evaluatedAt.Unix()),
attribute.Int64("cache_entry_evaluated_at", promise.evaluationStartedAt.Unix()),
))

// promise was stored, close its channel after fulfilment
Expand All @@ -244,13 +268,16 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex
promise.cloner = index.NewPostingsCloner(postings)
}

// Keep track of when the evaluation completed.
promise.evaluationCompletedAt = c.timeNow()

// The execution terminated (or has been canceled). We have to close the tracker to release resources.
// It's important to close it before computing the promise size, so that the actual size is smaller.
promise.callersCtxTracker.close()

sizeBytes := int64(len(key) + size.Of(promise))

c.onPromiseExecutionDone(ctx, key, ts, sizeBytes, promise.err)
c.onPromiseExecutionDone(ctx, key, promise.evaluationStartedAt, promise.evaluationCompletedAt, sizeBytes, promise.err)
return promise.result
}

Expand Down Expand Up @@ -318,7 +345,7 @@ func (c *PostingsForMatchersCache) evictHead() {
// onPromiseExecutionDone must be called once the execution of PostingsForMatchers promise has done.
// The input err contains details about any error that could have occurred when executing it.
// The input ts is the function call time.
func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, ts time.Time, sizeBytes int64, err error) {
func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, startedAt, completedAt time.Time, sizeBytes int64, err error) {
span := trace.SpanFromContext(ctx)

// Call the registered hook, if any. It's used only for testing purposes.
Expand Down Expand Up @@ -348,7 +375,7 @@ func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, k

c.cached.PushBack(&postingsForMatchersCachedCall{
key: key,
ts: ts,
ts: completedAt, // TODO explain why
sizeBytes: sizeBytes,
})
c.cachedBytes += sizeBytes
Expand All @@ -358,7 +385,8 @@ func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, k
}

span.AddEvent("added cached value to expiry queue", trace.WithAttributes(
attribute.Stringer("timestamp", ts),
attribute.Stringer("evaluation started at", startedAt),
attribute.Stringer("evaluation completed at", completedAt),
attribute.Int64("size in bytes", sizeBytes),
attribute.Int64("cached bytes", lastCachedBytes),
))
Expand Down

0 comments on commit b826f27

Please sign in to comment.