diff --git a/tsdb/postings_for_matchers_cache.go b/tsdb/postings_for_matchers_cache.go index 3bd5dc231..58938e687 100644 --- a/tsdb/postings_for_matchers_cache.go +++ b/tsdb/postings_for_matchers_cache.go @@ -141,8 +141,12 @@ type postingsForMatcherPromise struct { // callers contexts get canceled. callersCtxTracker *contextsTracker - // Keep track of the time this promise was evaluated, so we can understand the age of this cache entry in traces. - evaluatedAt time.Time + // Keep track of the time this promise started being evaluated, so we can understand the age of this cache entry in traces. + evaluationStartedAt time.Time + + // Keep track of the time this promise completed evaluation. + // Do not access these fields until the done channel is closed. + evaluationCompletedAt time.Time // The result of the promise is stored either in cloner or err (only of the two is valued). // Do not access these fields until the done channel is closed. @@ -174,9 +178,9 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex promiseCallersCtxTracker, promiseExecCtx := newContextsTracker() ts := c.timeNow() promise := &postingsForMatcherPromise{ - evaluatedAt: ts, - done: make(chan struct{}), - callersCtxTracker: promiseCallersCtxTracker, + evaluationStartedAt: ts, + done: make(chan struct{}), + callersCtxTracker: promiseCallersCtxTracker, } // Add the caller context to the ones tracked by the new promise. @@ -200,6 +204,26 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex oldPromise := oldPromiseValue.(*postingsForMatcherPromise) + // Check if the promise already completed the execution and its TTL has not expired yet. + // If the TTL has expired, we don't want to return it, so we just recompute the postings + // on-the-fly, bypassing the cache logic. It's less performant, but more accurate, because + // avoids returning stale data. + if c.ttl > 0 { + select { + case <-oldPromise.done: + if c.timeNow().Sub(oldPromise.evaluationCompletedAt) >= c.ttl { + // The cached promise already expired, but it has not been evicted. + // TODO trace + metric + return func(ctx context.Context) (index.Postings, error) { + return c.postingsForMatchers(ctx, ix, ms...) + } + } + + default: + // The evaluation is still in-flight. We wait for it. + } + } + // Add the caller context to the ones tracked by the old promise (currently in-flight). if err := oldPromise.callersCtxTracker.add(ctx); err != nil && errors.Is(err, errContextsTrackerCanceled{}) { // We've hit a race condition happening when the "loaded" promise execution was just canceled, @@ -218,7 +242,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex span.AddEvent("using cached postingsForMatchers promise", trace.WithAttributes( attribute.String("cache_key", key), - attribute.Int64("cache_entry_evaluated_at", promise.evaluatedAt.Unix()), + attribute.Int64("cache_entry_evaluated_at", promise.evaluationStartedAt.Unix()), )) return oldPromise.result @@ -226,7 +250,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex span.AddEvent("no postingsForMatchers promise in cache, executing query", trace.WithAttributes( attribute.String("cache_key", key), - attribute.Int64("cache_entry_evaluated_at", promise.evaluatedAt.Unix()), + attribute.Int64("cache_entry_evaluated_at", promise.evaluationStartedAt.Unix()), )) // promise was stored, close its channel after fulfilment @@ -244,13 +268,16 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Contex promise.cloner = index.NewPostingsCloner(postings) } + // Keep track of when the evaluation completed. + promise.evaluationCompletedAt = c.timeNow() + // The execution terminated (or has been canceled). We have to close the tracker to release resources. // It's important to close it before computing the promise size, so that the actual size is smaller. promise.callersCtxTracker.close() sizeBytes := int64(len(key) + size.Of(promise)) - c.onPromiseExecutionDone(ctx, key, ts, sizeBytes, promise.err) + c.onPromiseExecutionDone(ctx, key, promise.evaluationStartedAt, promise.evaluationCompletedAt, sizeBytes, promise.err) return promise.result } @@ -318,7 +345,7 @@ func (c *PostingsForMatchersCache) evictHead() { // onPromiseExecutionDone must be called once the execution of PostingsForMatchers promise has done. // The input err contains details about any error that could have occurred when executing it. // The input ts is the function call time. -func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, ts time.Time, sizeBytes int64, err error) { +func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, key string, startedAt, completedAt time.Time, sizeBytes int64, err error) { span := trace.SpanFromContext(ctx) // Call the registered hook, if any. It's used only for testing purposes. @@ -348,7 +375,7 @@ func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, k c.cached.PushBack(&postingsForMatchersCachedCall{ key: key, - ts: ts, + ts: completedAt, // TODO explain why sizeBytes: sizeBytes, }) c.cachedBytes += sizeBytes @@ -358,7 +385,8 @@ func (c *PostingsForMatchersCache) onPromiseExecutionDone(ctx context.Context, k } span.AddEvent("added cached value to expiry queue", trace.WithAttributes( - attribute.Stringer("timestamp", ts), + attribute.Stringer("evaluation started at", startedAt), + attribute.Stringer("evaluation completed at", completedAt), attribute.Int64("size in bytes", sizeBytes), attribute.Int64("cached bytes", lastCachedBytes), ))