Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
storage/fcds: add offsetCache ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
janos committed Dec 18, 2019
1 parent db658c7 commit 26f6626
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
4 changes: 2 additions & 2 deletions storage/fcds/fcds.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type Option func(*Store)

// WithCache is an optional argument to New constructor that enables
// in memory cache of free chunk data positions in files
func WithCache(yes bool) Option {
func WithCache(yes bool, ttl time.Duration) Option {
return func(s *Store) {
if yes {
s.freeCache = newOffsetCache(shardCount)
s.freeCache = newOffsetCache(shardCount, ttl)
} else {
s.freeCache = nil
}
Expand Down
60 changes: 51 additions & 9 deletions storage/fcds/offsetcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,35 @@

package fcds

import "sync"
import (
"sync"
"time"
)

// offsetCache is a simple cache of offset integers
// by shard files.
type offsetCache struct {
m map[uint8]map[int64]struct{}
mu sync.RWMutex
m map[uint8]map[int64]time.Time
ttl time.Duration
mu sync.RWMutex
quit chan struct{}
quitOnce sync.Once
}

// newOffsetCache constructs offsetCache for a fixed number of shards.
func newOffsetCache(shardCount uint8) (c *offsetCache) {
m := make(map[uint8]map[int64]struct{})
func newOffsetCache(shardCount uint8, ttl time.Duration) (c *offsetCache) {
m := make(map[uint8]map[int64]time.Time)
for i := uint8(0); i < shardCount; i++ {
m[i] = make(map[int64]struct{})
m[i] = make(map[int64]time.Time)
}
return &offsetCache{
m: m,
c = &offsetCache{
m: m,
quit: make(chan struct{}),
}
if ttl > 0 {
go c.cleanup(30 * time.Second)
}
return c
}

// get returns a free offset in a shard. If the returned
Expand All @@ -52,7 +63,7 @@ func (c *offsetCache) get(shard uint8) (offset int64) {
// set sets a free offset for a shard file.
func (c *offsetCache) set(shard uint8, offset int64) {
c.mu.Lock()
c.m[shard][offset] = struct{}{}
c.m[shard][offset] = time.Now().Add(c.ttl)
c.mu.Unlock()
}

Expand All @@ -62,3 +73,34 @@ func (c *offsetCache) remove(shard uint8, offset int64) {
delete(c.m[shard], offset)
c.mu.Unlock()
}

// close stops parallel processing created
// by offsetCache.
func (c *offsetCache) close() {
c.quitOnce.Do(func() {
close(c.quit)
})
}

func (c *offsetCache) cleanup(period time.Duration) {
ticker := time.NewTicker(period)
defer ticker.Stop()

for {
select {
case <-ticker.C:
now := time.Now()
c.mu.Lock()
for _, s := range c.m {
for offset, expiration := range s {
if now.After(expiration) {
delete(s, offset)
}
}
}
c.mu.Unlock()
case <-c.quit:
return
}
}
}
3 changes: 2 additions & 1 deletion storage/fcds/test/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"sync"
"testing"
"time"

"github.com/ethersphere/swarm/chunk"
chunktesting "github.com/ethersphere/swarm/chunk/testing"
Expand Down Expand Up @@ -297,7 +298,7 @@ func NewFCDSStore(t *testing.T, path string, metaStore fcds.MetaStore) (s *fcds.
t.Fatal(err)
}

s, err = fcds.New(path, chunk.DefaultSize, metaStore, fcds.WithCache(!*noCacheFlag))
s, err = fcds.New(path, chunk.DefaultSize, metaStore, fcds.WithCache(!*noCacheFlag, time.Hour))
if err != nil {
os.RemoveAll(path)
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
filepath.Join(path, "data"),
chunk.DefaultSize+8, // chunk data has additional 8 bytes prepended
metaStore,
fcds.WithCache(true),
fcds.WithCache(true, time.Hour),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 26f6626

Please sign in to comment.