Skip to content

Commit

Permalink
Make event cache size based (#3294)
Browse files Browse the repository at this point in the history
* Make event cache size based
  • Loading branch information
mkolodezny authored and emrahs committed Aug 14, 2020
1 parent df11c4b commit 6c47213
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 78 deletions.
17 changes: 17 additions & 0 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ type Options struct {
// RemovedFunc is an optional function called when an element
// is scheduled for deletion
RemovedFunc RemovedFunc

// MaxCount controls the max capacity of the cache
// It is required option if MaxSize is not provided
MaxCount int

// GetCacheItemSizeFunc is a function called upon adding the item to update the cache size.
// It returns 0 by default, assuming the cache is just count based
// It is required option if MaxCount is not provided
GetCacheItemSizeFunc GetCacheItemSizeFunc

// MaxSize is an optional and must be set along with GetCacheItemSizeFunc
// to control the max size in bytes of the cache
// It is required option if MaxCount is not provided
MaxSize uint64
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down Expand Up @@ -104,3 +118,6 @@ type Entry interface {
// CreateTime represents the time when the entry is created
CreateTime() time.Time
}

// GetCacheItemSizeFunc returns the cache item size in bytes
type GetCacheItemSizeFunc func(interface{}) uint64
85 changes: 59 additions & 26 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ var (
ErrCacheFull = errors.New("Cache capacity is fully occupied with pinned elements")
)

// upper limit to prevent infinite growing
const cacheCountLimit = 1 << 25

// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
ttl time.Duration
pin bool
rmFunc RemovedFunc
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxCount int
ttl time.Duration
pin bool
rmFunc RemovedFunc
sizeFunc GetCacheItemSizeFunc
maxSize uint64
currSize uint64
sizeByKey map[interface{}]uint64
isSizeBased bool
}

iteratorImpl struct {
Expand Down Expand Up @@ -126,33 +134,34 @@ func (entry *entryImpl) CreateTime() time.Time {
}

// New creates a new cache with the given options
func New(maxSize int, opts *Options) Cache {
if opts == nil {
opts = &Options{}
func New(opts *Options) Cache {
if opts == nil || (opts.MaxCount <= 0 && (opts.MaxSize <= 0 || opts.GetCacheItemSizeFunc == nil)) {
panic("Either MaxCount (count based) or " +
"MaxSize and GetCacheItemSizeFunc (size based) options must be provided for the LRU cache")
}

return &lru{
cache := &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
}
}

// NewLRU creates a new LRU cache of the given size, setting initial capacity
// to the max size
func NewLRU(maxSize int) Cache {
return New(maxSize, nil)
}

// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity
// and a max size
func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})
cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0

if cache.isSizeBased {
cache.sizeFunc = opts.GetCacheItemSizeFunc
cache.maxSize = opts.MaxSize
cache.sizeByKey = make(map[interface{}]uint64, opts.InitialCapacity)
} else {
// cache is count based if max size and sizeFunc are not provided
cache.maxCount = opts.MaxCount
cache.sizeFunc = func(interface{}) uint64 {
return 0
}
}
return cache
}

// Get retrieves the value stored under the given key
Expand Down Expand Up @@ -239,6 +248,7 @@ func (c *lru) Size() int {
// Put puts a new value associated with a given key, returning the existing value (if present)
// allowUpdate flag is used to control overwrite behavior if the value exists
func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) (interface{}, error) {
valueSize := c.sizeFunc(value)
c.mut.Lock()
defer c.mut.Unlock()

Expand Down Expand Up @@ -279,7 +289,8 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
}

c.byKey[key] = c.byAccess.PushFront(entry)
if len(c.byKey) == c.maxSize {
c.updateSizeOnAdd(key, valueSize)
for c.isCacheFull() {
oldest := c.byAccess.Back().Value.(*entryImpl)

if oldest.refCount > 0 {
Expand All @@ -301,8 +312,30 @@ func (c *lru) deleteInternal(element *list.Element) {
go c.rmFunc(entry.value)
}
delete(c.byKey, entry.key)
c.updateSizeOnDelete(entry.key)
}

func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool {
return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl))
}

func (c *lru) isCacheFull() bool {
count := len(c.byKey)
// if the value size is greater than maxSize(should never happen) then the item wont be cached
return (!c.isSizeBased && count == c.maxCount) || c.currSize > c.maxSize || count > cacheCountLimit
}

func (c *lru) updateSizeOnAdd(key interface{}, valueSize uint64) {
if c.isSizeBased {
c.sizeByKey[key] = valueSize
// the int overflow should not happen here
c.currSize += uint64(valueSize)
}
}

func (c *lru) updateSizeOnDelete(key interface{}) {
if c.isSizeBased {
c.currSize -= uint64(c.sizeByKey[key])
delete(c.sizeByKey, key)
}
}
144 changes: 132 additions & 12 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type keyType struct {
}

func TestLRU(t *testing.T) {
cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
Expand Down Expand Up @@ -62,6 +62,7 @@ func TestLRU(t *testing.T) {
cache.Get("C")
cache.Put("F", "Felp")
assert.Nil(t, cache.Get("D"))
assert.Equal(t, 4, cache.Size())

cache.Delete("A")
assert.Nil(t, cache.Get("A"))
Expand All @@ -74,7 +75,7 @@ func TestGenerics(t *testing.T) {
}
value := "some random value"

cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})
cache.Put(key, value)

assert.Equal(t, value, cache.Get(key))
Expand All @@ -89,8 +90,9 @@ func TestGenerics(t *testing.T) {
}

func TestLRUWithTTL(t *testing.T) {
cache := New(5, &Options{
TTL: time.Millisecond * 100,
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 100,
})
cache.Put("A", "foo")
assert.Equal(t, "foo", cache.Get("A"))
Expand All @@ -100,7 +102,7 @@ func TestLRUWithTTL(t *testing.T) {
}

func TestLRUCacheConcurrentAccess(t *testing.T) {
cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})
values := map[string]string{
"A": "foo",
"B": "bar",
Expand Down Expand Up @@ -154,7 +156,8 @@ func TestLRUCacheConcurrentAccess(t *testing.T) {

func TestRemoveFunc(t *testing.T) {
ch := make(chan bool)
cache := New(5, &Options{
cache := New(&Options{
MaxCount: 5,
RemovedFunc: func(i interface{}) {
_, ok := i.(*testing.T)
assert.True(t, ok)
Expand All @@ -177,8 +180,9 @@ func TestRemoveFunc(t *testing.T) {

func TestRemovedFuncWithTTL(t *testing.T) {
ch := make(chan bool)
cache := New(5, &Options{
TTL: time.Millisecond * 50,
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
RemovedFunc: func(i interface{}) {
_, ok := i.(*testing.T)
assert.True(t, ok)
Expand All @@ -202,9 +206,10 @@ func TestRemovedFuncWithTTL(t *testing.T) {

func TestRemovedFuncWithTTL_Pin(t *testing.T) {
ch := make(chan bool)
cache := New(5, &Options{
TTL: time.Millisecond * 50,
Pin: true,
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
Pin: true,
RemovedFunc: func(i interface{}) {
_, ok := i.(*testing.T)
assert.True(t, ok)
Expand Down Expand Up @@ -240,7 +245,7 @@ func TestIterator(t *testing.T) {
"D": "Delta",
}

cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})

for k, v := range expected {
cache.Put(k, v)
Expand All @@ -264,3 +269,118 @@ func TestIterator(t *testing.T) {
it.Close()
assert.Equal(t, expected, actual)
}

func TestLRU_SizeBased_SizeExceeded(t *testing.T) {
valueSize := 5
cache := New(&Options{
MaxCount: 5,
GetCacheItemSizeFunc: func(interface{}) uint64 {
return uint64(valueSize)
},
MaxSize: 15,
})

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Equal(t, 1, cache.Size())

cache.Put("B", "Bar")
cache.Put("C", "Cid")
cache.Put("D", "Delt")
assert.Nil(t, cache.Get("A"))
assert.Equal(t, 3, cache.Size())

assert.Equal(t, "Bar", cache.Get("B"))
assert.Equal(t, "Cid", cache.Get("C"))
assert.Equal(t, "Delt", cache.Get("D"))

cache.Put("A", "Foo2")
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Equal(t, 3, cache.Size())

valueSize = 15 // put large value to evict the rest in a loop
cache.Put("E", "Epsi")
assert.Nil(t, cache.Get("C"))
assert.Equal(t, "Epsi", cache.Get("E"))
assert.Nil(t, cache.Get("A"))
assert.Equal(t, 1, cache.Size())

valueSize = 25 // put large value greater than maxSize to evict everything
cache.Put("M", "Mepsi")
assert.Nil(t, cache.Get("M"))
assert.Equal(t, 0, cache.Size())
}

func TestLRU_SizeBased_CountExceeded(t *testing.T) {
cache := New(&Options{
MaxCount: 5,
GetCacheItemSizeFunc: func(interface{}) uint64 {
return 5
},
MaxSize: 0,
})

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Equal(t, 1, cache.Size())

cache.Put("B", "Bar")
cache.Put("C", "Cid")
cache.Put("D", "Delt")
assert.Equal(t, 4, cache.Size())

assert.Equal(t, "Bar", cache.Get("B"))
assert.Equal(t, "Cid", cache.Get("C"))
assert.Equal(t, "Delt", cache.Get("D"))

cache.Put("A", "Foo2")
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Equal(t, 4, cache.Size())

cache.Put("E", "Epsi")
assert.Nil(t, cache.Get("B"))
assert.Equal(t, "Epsi", cache.Get("E"))
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Equal(t, 4, cache.Size())
}

func TestPanicMaxCountAndSizeNotProvided(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The LRU was initialized without panic")
}
}()

New(&Options{
TTL: time.Millisecond * 100,
GetCacheItemSizeFunc: func(interface{}) uint64 {
return 5
},
})
}

func TestPanicMaxCountAndSizeFuncNotProvided(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The LRU was initialized without panic")
}
}()

New(&Options{
TTL: time.Millisecond * 100,
MaxSize: 25,
})
}

func TestPanicOptionsIsNil(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The LRU was initialized without panic")
}
}()

New(nil)
}
Loading

0 comments on commit 6c47213

Please sign in to comment.