diff --git a/storage/fcds/fcds.go b/storage/fcds/fcds.go index 94227f8536..86c93f6826 100644 --- a/storage/fcds/fcds.go +++ b/storage/fcds/fcds.go @@ -70,10 +70,10 @@ type Option func(*Store) // WithCache is an optional argument to New constructor that enables // in memory cache of free chunk data positions in files -func WithCache(yes bool) Option { +func WithCache(yes bool, ttl time.Duration) Option { return func(s *Store) { if yes { - s.freeCache = newOffsetCache(shardCount) + s.freeCache = newOffsetCache(shardCount, ttl) } else { s.freeCache = nil } diff --git a/storage/fcds/offsetcache.go b/storage/fcds/offsetcache.go index 66311fdbc1..6a0ce0cfa9 100644 --- a/storage/fcds/offsetcache.go +++ b/storage/fcds/offsetcache.go @@ -16,24 +16,35 @@ package fcds -import "sync" +import ( + "sync" + "time" +) // offsetCache is a simple cache of offset integers // by shard files. type offsetCache struct { - m map[uint8]map[int64]struct{} - mu sync.RWMutex + m map[uint8]map[int64]time.Time + ttl time.Duration + mu sync.RWMutex + quit chan struct{} + quitOnce sync.Once } // newOffsetCache constructs offsetCache for a fixed number of shards. -func newOffsetCache(shardCount uint8) (c *offsetCache) { - m := make(map[uint8]map[int64]struct{}) +func newOffsetCache(shardCount uint8, ttl time.Duration) (c *offsetCache) { + m := make(map[uint8]map[int64]time.Time) for i := uint8(0); i < shardCount; i++ { - m[i] = make(map[int64]struct{}) + m[i] = make(map[int64]time.Time) } - return &offsetCache{ - m: m, + c = &offsetCache{ + m: m, + quit: make(chan struct{}), } + if ttl > 0 { + go c.cleanup(30 * time.Second) + } + return c } // get returns a free offset in a shard. If the returned @@ -52,7 +63,7 @@ func (c *offsetCache) get(shard uint8) (offset int64) { // set sets a free offset for a shard file. func (c *offsetCache) set(shard uint8, offset int64) { c.mu.Lock() - c.m[shard][offset] = struct{}{} + c.m[shard][offset] = time.Now().Add(c.ttl) c.mu.Unlock() } @@ -62,3 +73,34 @@ func (c *offsetCache) remove(shard uint8, offset int64) { delete(c.m[shard], offset) c.mu.Unlock() } + +// close stops parallel processing created +// by offsetCache. +func (c *offsetCache) close() { + c.quitOnce.Do(func() { + close(c.quit) + }) +} + +func (c *offsetCache) cleanup(period time.Duration) { + ticker := time.NewTicker(period) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + c.mu.Lock() + for _, s := range c.m { + for offset, expiration := range s { + if now.After(expiration) { + delete(s, offset) + } + } + } + c.mu.Unlock() + case <-c.quit: + return + } + } +} diff --git a/storage/fcds/test/store.go b/storage/fcds/test/store.go index f7dcfa9c69..9e7e68b537 100644 --- a/storage/fcds/test/store.go +++ b/storage/fcds/test/store.go @@ -25,6 +25,7 @@ import ( "os" "sync" "testing" + "time" "github.com/ethersphere/swarm/chunk" chunktesting "github.com/ethersphere/swarm/chunk/testing" @@ -297,7 +298,7 @@ func NewFCDSStore(t *testing.T, path string, metaStore fcds.MetaStore) (s *fcds. t.Fatal(err) } - s, err = fcds.New(path, chunk.DefaultSize, metaStore, fcds.WithCache(!*noCacheFlag)) + s, err = fcds.New(path, chunk.DefaultSize, metaStore, fcds.WithCache(!*noCacheFlag, time.Hour)) if err != nil { os.RemoveAll(path) t.Fatal(err) diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index 35631c8524..fadd42002c 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -230,7 +230,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { filepath.Join(path, "data"), chunk.DefaultSize+8, // chunk data has additional 8 bytes prepended metaStore, - fcds.WithCache(true), + fcds.WithCache(true, time.Hour), ) if err != nil { return nil, err