Skip to content

Commit

Permalink
allow to answer with stale cache entry in case of upstream server err…
Browse files Browse the repository at this point in the history
…or (#45)
  • Loading branch information
corneliusludmann authored Jul 30, 2021
1 parent 53f6c12 commit b904cb7
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 14 deletions.
14 changes: 8 additions & 6 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func getKey(cacheKeyTemplate string, r *http.Request) string {
}

// Get returns the cached response
func (h *HTTPCache) Get(key string, request *http.Request) (*Entry, bool) {
func (h *HTTPCache) Get(key string, request *http.Request, includeStale bool) (*Entry, bool) {
b := h.getBucketIndexForKey(key)
h.entriesLock[b].RLock()
defer h.entriesLock[b].RUnlock()
Expand All @@ -447,7 +447,7 @@ func (h *HTTPCache) Get(key string, request *http.Request) (*Entry, bool) {
}

for _, entry := range previousEntries {
if entry.IsFresh() && matchVary(request, entry) {
if (entry.IsFresh() || includeStale) && matchVary(request, entry) {
return entry, true
}
}
Expand Down Expand Up @@ -498,14 +498,14 @@ func (h *HTTPCache) Del(key string) error {
}

// Put adds the entry in the cache
func (h *HTTPCache) Put(request *http.Request, entry *Entry) {
func (h *HTTPCache) Put(request *http.Request, entry *Entry, config *Config) {
key := entry.Key()
bucket := h.getBucketIndexForKey(key)

h.entriesLock[bucket].Lock()
defer h.entriesLock[bucket].Unlock()

h.scheduleCleanEntry(entry)
h.scheduleCleanEntry(entry, config.StaleMaxAge)

for i, previousEntry := range h.entries[bucket][key] {
if matchVary(entry.Request, previousEntry) {
Expand Down Expand Up @@ -571,9 +571,11 @@ func (h *HTTPCache) cleanEntry(entry *Entry) error {
return nil
}

func (h *HTTPCache) scheduleCleanEntry(entry *Entry) {
func (h *HTTPCache) scheduleCleanEntry(entry *Entry, staleMaxAge time.Duration) {
go func(entry *Entry) {
time.Sleep(entry.expiration.Sub(time.Now()))
expiration := entry.expiration
expiration = expiration.Add(staleMaxAge)
time.Sleep(expiration.Sub(time.Now()))
h.cleanEntry(entry)
}(entry)
}
8 changes: 4 additions & 4 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (suite *HTTPCacheTestSuite) SetupSuite() {

func (suite *HTTPCacheTestSuite) TestGetNonExistEntry() {
req := makeRequest("/", http.Header{})
entry, exists := suite.cache.Get("abc", req)
entry, exists := suite.cache.Get("abc", req, false)
suite.Nil(entry)
suite.False(exists)
}
Expand All @@ -238,9 +238,9 @@ func (suite *HTTPCacheTestSuite) TestGetExistEntry() {
req := makeRequest("/", http.Header{})
res := makeResponse(200, http.Header{})
entry := NewEntry("hello", req, res, suite.config)
suite.cache.Put(req, entry)
suite.cache.Put(req, entry, suite.config)

prevEntry, exists := suite.cache.Get("hello", req)
prevEntry, exists := suite.cache.Get("hello", req, false)
suite.Equal(prevEntry, entry)
suite.True(exists)
}
Expand All @@ -251,7 +251,7 @@ func (suite *HTTPCacheTestSuite) TestCleanEntry() {
key := "friday"

entry := NewEntry(key, req, res, suite.config)
suite.cache.Put(req, entry)
suite.cache.Put(req, entry, suite.config)

keyInKeys := false
keys := suite.cache.Keys()
Expand Down
15 changes: 15 additions & 0 deletions caddyfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
defaultcacheBucketsNum = 256
defaultCacheMaxMemorySize = GB // default is 1 GB
defaultRedisConnectionSetting = "localhost:6379 0"
defaultStaleMaxAge = time.Duration(0)
defaultCacheKeyTemplate = "{http.request.method} {http.request.host}{http.request.uri.path}?{http.request.uri.query}"
// Note: prevent character space in the key
// the key is refereced from github.com/caddyserver/caddy/v2/modules/caddyhttp.addHTTPVarsToReplacer
Expand All @@ -66,6 +67,7 @@ const (
// the following are keys for extensions
keyDistributed = "distributed"
keyInfluxLog = "influxlog"
keyStaleMaxAge = "stale_max_age"
)

func init() {
Expand All @@ -85,6 +87,7 @@ type Config struct {
Path string `json:"path,omitempty"`
CacheKeyTemplate string `json:"cache_key_template,omitempty"`
RedisConnectionSetting string `json:"redis_connection_setting,omitempty"`
StaleMaxAge time.Duration `json:"stale_max_age,omitempty"`
}

func getDefaultConfig() *Config {
Expand All @@ -100,6 +103,7 @@ func getDefaultConfig() *Config {
Type: defaultCacheType,
CacheKeyTemplate: defaultCacheKeyTemplate,
RedisConnectionSetting: defaultRedisConnectionSetting,
StaleMaxAge: defaultStaleMaxAge,
}
}

Expand Down Expand Up @@ -250,6 +254,17 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {

h.DistributedRaw = caddyconfig.JSONModuleObject(unm, "distributed", "consul", nil)

case keyStaleMaxAge:
if len(args) != 1 {
return d.Err("Invalid usage of stale_max_age in cache config.")
}

duration, err := time.ParseDuration(args[0])
if err != nil {
return d.Err(fmt.Sprintf("%s:%s, %s", keyStaleMaxAge, "Invalid duration ", parameter))
}
config.StaleMaxAge = duration

default:
return d.Err("Unknown cache parameter: " + parameter)
}
Expand Down
2 changes: 1 addition & 1 deletion endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c cachePurge) handleShowCache(w http.ResponseWriter, r *http.Request) erro
key := helper.TrimBy(r.URL.Path, "/", 2)
cache := getHandlerCache()

entry, exists := cache.Get(key, r)
entry, exists := cache.Get(key, r, false)
if exists {
err = entry.WriteBodyTo(w)
}
Expand Down
21 changes: 18 additions & 3 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
lock := h.URLLocks.Acquire(key)
defer lock.Unlock()

previousEntry, exists := h.Cache.Get(key, r)
previousEntry, exists := h.Cache.Get(key, r, false)

// First case: CACHE HIT
// The response exists in cache and is public
Expand All @@ -365,7 +365,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
return caddyhttp.Error(http.StatusInternalServerError, err)
}

h.Cache.Put(r, entry)
h.Cache.Put(r, entry, h.Config)
response.Close()

// NOTE: should set the content-length to the header manually when distributed
Expand Down Expand Up @@ -396,6 +396,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
entry, err := h.fetchUpstream(r, next)
upstreamDuration = time.Since(t)

if entry.Response.Code >= 500 {
// using stale entry when available
previousEntry, exists := h.Cache.Get(key, r, true)

if exists && previousEntry.isPublic {
if err := h.respond(w, previousEntry, cacheHit); err == nil {
return nil
} else if _, ok := err.(backends.NoPreCollectError); ok {
// if the err is No pre collect, just return nil
w.WriteHeader(previousEntry.Response.Code)
return nil
}
}
}

if err != nil {
return caddyhttp.Error(entry.Response.Code, err)
}
Expand All @@ -407,7 +422,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
return caddyhttp.Error(http.StatusInternalServerError, err)
}

h.Cache.Put(r, entry)
h.Cache.Put(r, entry, h.Config)
err = h.respond(w, entry, cacheMiss)
if err != nil {
h.logger.Error("cache handler", zap.Error(err))
Expand Down
5 changes: 5 additions & 0 deletions readme.org
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@
*** default_max_age
The cache's expiration time.

*** stale_max_age
The duration that a cache entry is kept in the cache, even though it has already expired. The default duration is =0=.

If this duration is > 0 and the upstream server answers with an HTTP status code >= 500 (server error) this plugin checks whether there is still an expired (stale) entry from a previous, successful call in the cache. In that case, this stale entry is used to answer instead of the 5xx response.

*** match_header
only the req's header match the condtions
ex.
Expand Down

0 comments on commit b904cb7

Please sign in to comment.