Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make event cache size based #3294

Merged
merged 32 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
03cbf16
Make event cache size based
mkolodezny May 28, 2020
d3d701e
refactor
mkolodezny May 29, 2020
5a96447
refactor
mkolodezny May 29, 2020
02f9aa0
refactor
mkolodezny May 29, 2020
2d18bc0
refactor
mkolodezny May 29, 2020
635bc45
refactor
mkolodezny May 29, 2020
1e05885
Implement simple RWMutex cache to be used in domainCache (#3273)
andrewjdawson2016 May 26, 2020
f18002d
Add invariant manager (#3263)
andrewjdawson2016 May 26, 2020
34af994
Transfer queue processor base V2 (#3278)
yycptt May 27, 2020
751ac69
Priority Task Processor Improvements (#3284)
yycptt May 27, 2020
5ba14af
Add filter for pending active task to redispatch (#3279)
yux0 May 27, 2020
825fe05
Failover marker persistence (#3274)
yux0 May 27, 2020
3e25f4e
Scanner impl (#3286)
andrewjdawson2016 May 28, 2020
5e1df2b
Improve processing queue split policy (#3287)
yycptt May 28, 2020
10479a2
Wire up multi-cursor transfer queue processor implementation (#3285)
yycptt May 29, 2020
f739cee
Make event cache size based
mkolodezny May 28, 2020
634db6e
refactor
mkolodezny May 29, 2020
791ecb8
Merge branch 'master' into cache_size
mkolodezny May 29, 2020
c74f710
refactor
mkolodezny May 29, 2020
decac58
rebased
mkolodezny May 29, 2020
f0c2524
more unit tests
mkolodezny May 29, 2020
37659f1
more tests
mkolodezny May 29, 2020
1c992eb
more tests
mkolodezny May 29, 2020
b1b3246
more tests
mkolodezny May 29, 2020
04b0f0c
more tests
mkolodezny May 29, 2020
a9710c2
addressed comments
mkolodezny May 31, 2020
dec402e
adressed comments
mkolodezny May 31, 2020
8786310
addressed comments
mkolodezny May 31, 2020
35de605
addressed comments
mkolodezny Jun 1, 2020
ae869fd
addressed comments
mkolodezny Jun 1, 2020
367a0a0
refactor
mkolodezny Jun 1, 2020
3373a0e
Merge branch 'master' into cache_size
mkolodezny Jun 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ type Options struct {
// RemovedFunc is an optional function called when an element
// is scheduled for deletion
RemovedFunc RemovedFunc

// MaxCount controls the max capacity of the cache
// It is required option if MaxSize is not provided
MaxCount int

// GetCacheItemSizeFunc is a function called upon adding the item to update the cache size.
// It returns 0 by default, assuming the cache is just count based
// It is required option if MaxCount is not provided
GetCacheItemSizeFunc GetCacheItemSizeFunc

// MaxSize is an optional and must be set along with GetCacheItemSizeFunc
// to control the max size in bytes of the cache
// It is required option if MaxCount is not provided
MaxSize uint64
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down Expand Up @@ -104,3 +118,6 @@ type Entry interface {
// CreateTime represents the time when the entry is created
CreateTime() time.Time
}

// GetCacheItemSizeFunc returns the cache item size in bytes
type GetCacheItemSizeFunc func(interface{}) uint64
85 changes: 59 additions & 26 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@ var (
ErrCacheFull = errors.New("Cache capacity is fully occupied with pinned elements")
)

// upper limit to prevent infinite growing
const cacheCountLimit = 1 << 25

// lru is a concurrent fixed size cache that evicts elements in lru order
type (
lru struct {
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxSize int
ttl time.Duration
pin bool
rmFunc RemovedFunc
mut sync.Mutex
byAccess *list.List
byKey map[interface{}]*list.Element
maxCount int
ttl time.Duration
pin bool
rmFunc RemovedFunc
sizeFunc GetCacheItemSizeFunc
maxSize uint64
currSize uint64
sizeByKey map[interface{}]uint64
isSizeBased bool
}

iteratorImpl struct {
Expand Down Expand Up @@ -126,33 +134,34 @@ func (entry *entryImpl) CreateTime() time.Time {
}

// New creates a new cache with the given options
func New(maxSize int, opts *Options) Cache {
if opts == nil {
opts = &Options{}
func New(opts *Options) Cache {
if opts == nil || (opts.MaxCount <= 0 && (opts.MaxSize <= 0 || opts.GetCacheItemSizeFunc == nil)) {
panic("Either MaxCount (count based) or " +
"MaxSize and GetCacheItemSizeFunc (size based) options must be provided for the LRU cache")
}

return &lru{
cache := &lru{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, size and count is either/or, meaning that if it's size based, we don't care about the count, as long as it stays within size limit.

Copy link
Contributor Author

@mkolodezny mkolodezny Jun 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, if maxSize is provided then it is size based only and count based otherwise. And if size based, lets still check for an upper limit for count to prevent the infinite growing, for safety sake.

byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
maxSize: maxSize,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
}
}

// NewLRU creates a new LRU cache of the given size, setting initial capacity
// to the max size
func NewLRU(maxSize int) Cache {
return New(maxSize, nil)
}

// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity
// and a max size
func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache {
return New(maxSize, &Options{
InitialCapacity: initialCapacity,
})
cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0

if cache.isSizeBased {
cache.sizeFunc = opts.GetCacheItemSizeFunc
cache.maxSize = opts.MaxSize
cache.sizeByKey = make(map[interface{}]uint64, opts.InitialCapacity)
} else {
// cache is count based if max size and sizeFunc are not provided
cache.maxCount = opts.MaxCount
cache.sizeFunc = func(interface{}) uint64 {
return 0
}
}
return cache
}

// Get retrieves the value stored under the given key
Expand Down Expand Up @@ -239,6 +248,7 @@ func (c *lru) Size() int {
// Put puts a new value associated with a given key, returning the existing value (if present)
// allowUpdate flag is used to control overwrite behavior if the value exists
func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) (interface{}, error) {
valueSize := c.sizeFunc(value)
c.mut.Lock()
defer c.mut.Unlock()

Expand Down Expand Up @@ -279,7 +289,8 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
}

c.byKey[key] = c.byAccess.PushFront(entry)
if len(c.byKey) == c.maxSize {
c.updateSizeOnAdd(key, valueSize)
for c.isCacheFull() {
oldest := c.byAccess.Back().Value.(*entryImpl)

if oldest.refCount > 0 {
Expand All @@ -301,8 +312,30 @@ func (c *lru) deleteInternal(element *list.Element) {
go c.rmFunc(entry.value)
}
delete(c.byKey, entry.key)
c.updateSizeOnDelete(entry.key)
}

func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool {
return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl))
}

func (c *lru) isCacheFull() bool {
count := len(c.byKey)
// if the value size is greater than maxSize(should never happen) then the item wont be cached
return (!c.isSizeBased && count == c.maxCount) || c.currSize > c.maxSize || count > cacheCountLimit
}

func (c *lru) updateSizeOnAdd(key interface{}, valueSize uint64) {
if c.isSizeBased {
c.sizeByKey[key] = valueSize
// the int overflow should not happen here
c.currSize += uint64(valueSize)
}
}

func (c *lru) updateSizeOnDelete(key interface{}) {
if c.isSizeBased {
c.currSize -= uint64(c.sizeByKey[key])
delete(c.sizeByKey, key)
}
}
144 changes: 132 additions & 12 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type keyType struct {
}

func TestLRU(t *testing.T) {
cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
Expand Down Expand Up @@ -62,6 +62,7 @@ func TestLRU(t *testing.T) {
cache.Get("C")
cache.Put("F", "Felp")
assert.Nil(t, cache.Get("D"))
assert.Equal(t, 4, cache.Size())

cache.Delete("A")
assert.Nil(t, cache.Get("A"))
Expand All @@ -74,7 +75,7 @@ func TestGenerics(t *testing.T) {
}
value := "some random value"

cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})
cache.Put(key, value)

assert.Equal(t, value, cache.Get(key))
Expand All @@ -89,8 +90,9 @@ func TestGenerics(t *testing.T) {
}

func TestLRUWithTTL(t *testing.T) {
cache := New(5, &Options{
TTL: time.Millisecond * 100,
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 100,
})
cache.Put("A", "foo")
assert.Equal(t, "foo", cache.Get("A"))
Expand All @@ -100,7 +102,7 @@ func TestLRUWithTTL(t *testing.T) {
}

func TestLRUCacheConcurrentAccess(t *testing.T) {
cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})
values := map[string]string{
"A": "foo",
"B": "bar",
Expand Down Expand Up @@ -154,7 +156,8 @@ func TestLRUCacheConcurrentAccess(t *testing.T) {

func TestRemoveFunc(t *testing.T) {
ch := make(chan bool)
cache := New(5, &Options{
cache := New(&Options{
MaxCount: 5,
RemovedFunc: func(i interface{}) {
_, ok := i.(*testing.T)
assert.True(t, ok)
Expand All @@ -177,8 +180,9 @@ func TestRemoveFunc(t *testing.T) {

func TestRemovedFuncWithTTL(t *testing.T) {
ch := make(chan bool)
cache := New(5, &Options{
TTL: time.Millisecond * 50,
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
RemovedFunc: func(i interface{}) {
_, ok := i.(*testing.T)
assert.True(t, ok)
Expand All @@ -202,9 +206,10 @@ func TestRemovedFuncWithTTL(t *testing.T) {

func TestRemovedFuncWithTTL_Pin(t *testing.T) {
ch := make(chan bool)
cache := New(5, &Options{
TTL: time.Millisecond * 50,
Pin: true,
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
Pin: true,
RemovedFunc: func(i interface{}) {
_, ok := i.(*testing.T)
assert.True(t, ok)
Expand Down Expand Up @@ -240,7 +245,7 @@ func TestIterator(t *testing.T) {
"D": "Delta",
}

cache := NewLRU(5)
cache := New(&Options{MaxCount: 5})

for k, v := range expected {
cache.Put(k, v)
Expand All @@ -264,3 +269,118 @@ func TestIterator(t *testing.T) {
it.Close()
assert.Equal(t, expected, actual)
}

func TestLRU_SizeBased_SizeExceeded(t *testing.T) {
valueSize := 5
cache := New(&Options{
MaxCount: 5,
GetCacheItemSizeFunc: func(interface{}) uint64 {
return uint64(valueSize)
},
MaxSize: 15,
})

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Equal(t, 1, cache.Size())

cache.Put("B", "Bar")
cache.Put("C", "Cid")
cache.Put("D", "Delt")
assert.Nil(t, cache.Get("A"))
assert.Equal(t, 3, cache.Size())

assert.Equal(t, "Bar", cache.Get("B"))
assert.Equal(t, "Cid", cache.Get("C"))
assert.Equal(t, "Delt", cache.Get("D"))

cache.Put("A", "Foo2")
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Equal(t, 3, cache.Size())

valueSize = 15 // put large value to evict the rest in a loop
cache.Put("E", "Epsi")
assert.Nil(t, cache.Get("C"))
assert.Equal(t, "Epsi", cache.Get("E"))
assert.Nil(t, cache.Get("A"))
assert.Equal(t, 1, cache.Size())

valueSize = 25 // put large value greater than maxSize to evict everything
cache.Put("M", "Mepsi")
assert.Nil(t, cache.Get("M"))
assert.Equal(t, 0, cache.Size())
}

func TestLRU_SizeBased_CountExceeded(t *testing.T) {
cache := New(&Options{
MaxCount: 5,
GetCacheItemSizeFunc: func(interface{}) uint64 {
return 5
},
MaxSize: 0,
})

cache.Put("A", "Foo")
assert.Equal(t, "Foo", cache.Get("A"))
assert.Nil(t, cache.Get("B"))
assert.Equal(t, 1, cache.Size())

cache.Put("B", "Bar")
cache.Put("C", "Cid")
cache.Put("D", "Delt")
assert.Equal(t, 4, cache.Size())

assert.Equal(t, "Bar", cache.Get("B"))
assert.Equal(t, "Cid", cache.Get("C"))
assert.Equal(t, "Delt", cache.Get("D"))

cache.Put("A", "Foo2")
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Equal(t, 4, cache.Size())

cache.Put("E", "Epsi")
assert.Nil(t, cache.Get("B"))
assert.Equal(t, "Epsi", cache.Get("E"))
assert.Equal(t, "Foo2", cache.Get("A"))
assert.Equal(t, 4, cache.Size())
}

func TestPanicMaxCountAndSizeNotProvided(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The LRU was initialized without panic")
}
}()

New(&Options{
TTL: time.Millisecond * 100,
GetCacheItemSizeFunc: func(interface{}) uint64 {
return 5
},
})
}

func TestPanicMaxCountAndSizeFuncNotProvided(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The LRU was initialized without panic")
}
}()

New(&Options{
TTL: time.Millisecond * 100,
MaxSize: 25,
})
}

func TestPanicOptionsIsNil(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The LRU was initialized without panic")
}
}()

New(nil)
}
Loading