diff --git a/common/cache/cache.go b/common/cache/cache.go index 1b71a68cdfe..615f3cb3e6f 100644 --- a/common/cache/cache.go +++ b/common/cache/cache.go @@ -66,6 +66,20 @@ type Options struct { // RemovedFunc is an optional function called when an element // is scheduled for deletion RemovedFunc RemovedFunc + + // MaxCount controls the max capacity of the cache + // It is required option if MaxSize is not provided + MaxCount int + + // GetCacheItemSizeFunc is a function called upon adding the item to update the cache size. + // It returns 0 by default, assuming the cache is just count based + // It is required option if MaxCount is not provided + GetCacheItemSizeFunc GetCacheItemSizeFunc + + // MaxSize is an optional and must be set along with GetCacheItemSizeFunc + // to control the max size in bytes of the cache + // It is required option if MaxCount is not provided + MaxSize uint64 } // SimpleOptions provides options that can be used to configure SimpleCache @@ -104,3 +118,6 @@ type Entry interface { // CreateTime represents the time when the entry is created CreateTime() time.Time } + +// GetCacheItemSizeFunc returns the cache item size in bytes +type GetCacheItemSizeFunc func(interface{}) uint64 diff --git a/common/cache/lru.go b/common/cache/lru.go index f50e33b7e3c..6f372ec35fd 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -32,16 +32,24 @@ var ( ErrCacheFull = errors.New("Cache capacity is fully occupied with pinned elements") ) +// upper limit to prevent infinite growing +const cacheCountLimit = 1 << 25 + // lru is a concurrent fixed size cache that evicts elements in lru order type ( lru struct { - mut sync.Mutex - byAccess *list.List - byKey map[interface{}]*list.Element - maxSize int - ttl time.Duration - pin bool - rmFunc RemovedFunc + mut sync.Mutex + byAccess *list.List + byKey map[interface{}]*list.Element + maxCount int + ttl time.Duration + pin bool + rmFunc RemovedFunc + sizeFunc GetCacheItemSizeFunc + maxSize uint64 + currSize uint64 + sizeByKey map[interface{}]uint64 + isSizeBased bool } iteratorImpl struct { @@ -126,33 +134,34 @@ func (entry *entryImpl) CreateTime() time.Time { } // New creates a new cache with the given options -func New(maxSize int, opts *Options) Cache { - if opts == nil { - opts = &Options{} +func New(opts *Options) Cache { + if opts == nil || (opts.MaxCount <= 0 && (opts.MaxSize <= 0 || opts.GetCacheItemSizeFunc == nil)) { + panic("Either MaxCount (count based) or " + + "MaxSize and GetCacheItemSizeFunc (size based) options must be provided for the LRU cache") } - return &lru{ + cache := &lru{ byAccess: list.New(), byKey: make(map[interface{}]*list.Element, opts.InitialCapacity), ttl: opts.TTL, - maxSize: maxSize, pin: opts.Pin, rmFunc: opts.RemovedFunc, } -} - -// NewLRU creates a new LRU cache of the given size, setting initial capacity -// to the max size -func NewLRU(maxSize int) Cache { - return New(maxSize, nil) -} -// NewLRUWithInitialCapacity creates a new LRU cache with an initial capacity -// and a max size -func NewLRUWithInitialCapacity(initialCapacity, maxSize int) Cache { - return New(maxSize, &Options{ - InitialCapacity: initialCapacity, - }) + cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0 + + if cache.isSizeBased { + cache.sizeFunc = opts.GetCacheItemSizeFunc + cache.maxSize = opts.MaxSize + cache.sizeByKey = make(map[interface{}]uint64, opts.InitialCapacity) + } else { + // cache is count based if max size and sizeFunc are not provided + cache.maxCount = opts.MaxCount + cache.sizeFunc = func(interface{}) uint64 { + return 0 + } + } + return cache } // Get retrieves the value stored under the given key @@ -239,6 +248,7 @@ func (c *lru) Size() int { // Put puts a new value associated with a given key, returning the existing value (if present) // allowUpdate flag is used to control overwrite behavior if the value exists func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) (interface{}, error) { + valueSize := c.sizeFunc(value) c.mut.Lock() defer c.mut.Unlock() @@ -279,7 +289,8 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) } c.byKey[key] = c.byAccess.PushFront(entry) - if len(c.byKey) == c.maxSize { + c.updateSizeOnAdd(key, valueSize) + for c.isCacheFull() { oldest := c.byAccess.Back().Value.(*entryImpl) if oldest.refCount > 0 { @@ -301,8 +312,30 @@ func (c *lru) deleteInternal(element *list.Element) { go c.rmFunc(entry.value) } delete(c.byKey, entry.key) + c.updateSizeOnDelete(entry.key) } func (c *lru) isEntryExpired(entry *entryImpl, currentTime time.Time) bool { return entry.refCount == 0 && !entry.createTime.IsZero() && currentTime.After(entry.createTime.Add(c.ttl)) } + +func (c *lru) isCacheFull() bool { + count := len(c.byKey) + // if the value size is greater than maxSize(should never happen) then the item wont be cached + return (!c.isSizeBased && count == c.maxCount) || c.currSize > c.maxSize || count > cacheCountLimit +} + +func (c *lru) updateSizeOnAdd(key interface{}, valueSize uint64) { + if c.isSizeBased { + c.sizeByKey[key] = valueSize + // the int overflow should not happen here + c.currSize += uint64(valueSize) + } +} + +func (c *lru) updateSizeOnDelete(key interface{}) { + if c.isSizeBased { + c.currSize -= uint64(c.sizeByKey[key]) + delete(c.sizeByKey, key) + } +} diff --git a/common/cache/lru_test.go b/common/cache/lru_test.go index fa51aaeb758..3ba0d1508c2 100644 --- a/common/cache/lru_test.go +++ b/common/cache/lru_test.go @@ -34,7 +34,7 @@ type keyType struct { } func TestLRU(t *testing.T) { - cache := NewLRU(5) + cache := New(&Options{MaxCount: 5}) cache.Put("A", "Foo") assert.Equal(t, "Foo", cache.Get("A")) @@ -62,6 +62,7 @@ func TestLRU(t *testing.T) { cache.Get("C") cache.Put("F", "Felp") assert.Nil(t, cache.Get("D")) + assert.Equal(t, 4, cache.Size()) cache.Delete("A") assert.Nil(t, cache.Get("A")) @@ -74,7 +75,7 @@ func TestGenerics(t *testing.T) { } value := "some random value" - cache := NewLRU(5) + cache := New(&Options{MaxCount: 5}) cache.Put(key, value) assert.Equal(t, value, cache.Get(key)) @@ -89,8 +90,9 @@ func TestGenerics(t *testing.T) { } func TestLRUWithTTL(t *testing.T) { - cache := New(5, &Options{ - TTL: time.Millisecond * 100, + cache := New(&Options{ + MaxCount: 5, + TTL: time.Millisecond * 100, }) cache.Put("A", "foo") assert.Equal(t, "foo", cache.Get("A")) @@ -100,7 +102,7 @@ func TestLRUWithTTL(t *testing.T) { } func TestLRUCacheConcurrentAccess(t *testing.T) { - cache := NewLRU(5) + cache := New(&Options{MaxCount: 5}) values := map[string]string{ "A": "foo", "B": "bar", @@ -154,7 +156,8 @@ func TestLRUCacheConcurrentAccess(t *testing.T) { func TestRemoveFunc(t *testing.T) { ch := make(chan bool) - cache := New(5, &Options{ + cache := New(&Options{ + MaxCount: 5, RemovedFunc: func(i interface{}) { _, ok := i.(*testing.T) assert.True(t, ok) @@ -177,8 +180,9 @@ func TestRemoveFunc(t *testing.T) { func TestRemovedFuncWithTTL(t *testing.T) { ch := make(chan bool) - cache := New(5, &Options{ - TTL: time.Millisecond * 50, + cache := New(&Options{ + MaxCount: 5, + TTL: time.Millisecond * 50, RemovedFunc: func(i interface{}) { _, ok := i.(*testing.T) assert.True(t, ok) @@ -202,9 +206,10 @@ func TestRemovedFuncWithTTL(t *testing.T) { func TestRemovedFuncWithTTL_Pin(t *testing.T) { ch := make(chan bool) - cache := New(5, &Options{ - TTL: time.Millisecond * 50, - Pin: true, + cache := New(&Options{ + MaxCount: 5, + TTL: time.Millisecond * 50, + Pin: true, RemovedFunc: func(i interface{}) { _, ok := i.(*testing.T) assert.True(t, ok) @@ -240,7 +245,7 @@ func TestIterator(t *testing.T) { "D": "Delta", } - cache := NewLRU(5) + cache := New(&Options{MaxCount: 5}) for k, v := range expected { cache.Put(k, v) @@ -264,3 +269,118 @@ func TestIterator(t *testing.T) { it.Close() assert.Equal(t, expected, actual) } + +func TestLRU_SizeBased_SizeExceeded(t *testing.T) { + valueSize := 5 + cache := New(&Options{ + MaxCount: 5, + GetCacheItemSizeFunc: func(interface{}) uint64 { + return uint64(valueSize) + }, + MaxSize: 15, + }) + + cache.Put("A", "Foo") + assert.Equal(t, "Foo", cache.Get("A")) + assert.Nil(t, cache.Get("B")) + assert.Equal(t, 1, cache.Size()) + + cache.Put("B", "Bar") + cache.Put("C", "Cid") + cache.Put("D", "Delt") + assert.Nil(t, cache.Get("A")) + assert.Equal(t, 3, cache.Size()) + + assert.Equal(t, "Bar", cache.Get("B")) + assert.Equal(t, "Cid", cache.Get("C")) + assert.Equal(t, "Delt", cache.Get("D")) + + cache.Put("A", "Foo2") + assert.Equal(t, "Foo2", cache.Get("A")) + assert.Nil(t, cache.Get("B")) + assert.Equal(t, 3, cache.Size()) + + valueSize = 15 // put large value to evict the rest in a loop + cache.Put("E", "Epsi") + assert.Nil(t, cache.Get("C")) + assert.Equal(t, "Epsi", cache.Get("E")) + assert.Nil(t, cache.Get("A")) + assert.Equal(t, 1, cache.Size()) + + valueSize = 25 // put large value greater than maxSize to evict everything + cache.Put("M", "Mepsi") + assert.Nil(t, cache.Get("M")) + assert.Equal(t, 0, cache.Size()) +} + +func TestLRU_SizeBased_CountExceeded(t *testing.T) { + cache := New(&Options{ + MaxCount: 5, + GetCacheItemSizeFunc: func(interface{}) uint64 { + return 5 + }, + MaxSize: 0, + }) + + cache.Put("A", "Foo") + assert.Equal(t, "Foo", cache.Get("A")) + assert.Nil(t, cache.Get("B")) + assert.Equal(t, 1, cache.Size()) + + cache.Put("B", "Bar") + cache.Put("C", "Cid") + cache.Put("D", "Delt") + assert.Equal(t, 4, cache.Size()) + + assert.Equal(t, "Bar", cache.Get("B")) + assert.Equal(t, "Cid", cache.Get("C")) + assert.Equal(t, "Delt", cache.Get("D")) + + cache.Put("A", "Foo2") + assert.Equal(t, "Foo2", cache.Get("A")) + assert.Equal(t, 4, cache.Size()) + + cache.Put("E", "Epsi") + assert.Nil(t, cache.Get("B")) + assert.Equal(t, "Epsi", cache.Get("E")) + assert.Equal(t, "Foo2", cache.Get("A")) + assert.Equal(t, 4, cache.Size()) +} + +func TestPanicMaxCountAndSizeNotProvided(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The LRU was initialized without panic") + } + }() + + New(&Options{ + TTL: time.Millisecond * 100, + GetCacheItemSizeFunc: func(interface{}) uint64 { + return 5 + }, + }) +} + +func TestPanicMaxCountAndSizeFuncNotProvided(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The LRU was initialized without panic") + } + }() + + New(&Options{ + TTL: time.Millisecond * 100, + MaxSize: 25, + }) +} + +func TestPanicOptionsIsNil(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The LRU was initialized without panic") + } + }() + + New(nil) +} diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index f798ed325c1..1b6eba19be9 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -147,12 +147,13 @@ var keys = map[Key]string{ HistoryCacheMaxSize: "history.cacheMaxSize", HistoryCacheTTL: "history.cacheTTL", HistoryShutdownDrainDuration: "history.shutdownDrainDuration", - EventsCacheInitialSize: "history.eventsCacheInitialSize", - EventsCacheMaxSize: "history.eventsCacheMaxSize", + EventsCacheInitialCount: "history.eventsCacheInitialSize", + EventsCacheMaxCount: "history.eventsCacheMaxSize", + EventsCacheMaxSize: "history.eventsCacheMaxSizeInBytes", EventsCacheTTL: "history.eventsCacheTTL", EventsCacheGlobalEnable: "history.eventsCacheGlobalEnable", - EventsCacheGlobalInitialSize: "history.eventsCacheGlobalInitialSize", - EventsCacheGlobalMaxSize: "history.eventsCacheGlobalMaxSize", + EventsCacheGlobalInitialCount: "history.eventsCacheGlobalInitialSize", + EventsCacheGlobalMaxCount: "history.eventsCacheGlobalMaxSize", AcquireShardInterval: "history.acquireShardInterval", AcquireShardConcurrency: "history.acquireShardConcurrency", StandbyClusterDelay: "history.standbyClusterDelay", @@ -493,18 +494,20 @@ const ( HistoryCacheTTL // HistoryShutdownDrainDuration is the duration of traffic drain during shutdown HistoryShutdownDrainDuration - // EventsCacheInitialSize is initial size of events cache - EventsCacheInitialSize - // EventsCacheMaxSize is max size of events cache + // EventsCacheInitialCount is initial count of events cache + EventsCacheInitialCount + // EventsCacheMaxCount is max count of events cache + EventsCacheMaxCount + // EventsCacheMaxSize is max size of events cache in bytes EventsCacheMaxSize // EventsCacheTTL is TTL of events cache EventsCacheTTL // EventsCacheGlobalEnable enables global cache over all history shards EventsCacheGlobalEnable - // EventsCacheGlobalInitialSize is initial size of global events cache - EventsCacheGlobalInitialSize - // EventsCacheGlobalMaxSize is max size of global events cache - EventsCacheGlobalMaxSize + // EventsCacheGlobalInitialCount is initial count of global events cache + EventsCacheGlobalInitialCount + // EventsCacheGlobalMaxCount is max count of global events cache + EventsCacheGlobalMaxCount // AcquireShardInterval is interval that timer used to acquire shard AcquireShardInterval // AcquireShardConcurrency is number of goroutines that can be used to acquire shards in the shard controller. diff --git a/common/util.go b/common/util.go index a8f3e87bb65..654f6232398 100644 --- a/common/util.go +++ b/common/util.go @@ -537,6 +537,130 @@ func GetSizeOfMapStringToByteArray(input map[string][]byte) int { return res + golandMapReserverNumberOfBytes } +// GetSizeOfHistoryEvent returns approximate size in bytes of the history event taking into account byte arrays only now +func GetSizeOfHistoryEvent(event *workflow.HistoryEvent) uint64 { + if event == nil { + return 0 + } + + res := 0 + switch *event.EventType { + case workflow.EventTypeWorkflowExecutionStarted: + res += len(event.WorkflowExecutionStartedEventAttributes.Input) + res += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails) + res += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult) + if event.WorkflowExecutionStartedEventAttributes.Memo != nil { + res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Memo.Fields) + } + if event.WorkflowExecutionStartedEventAttributes.Header != nil { + res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Header.Fields) + } + if event.WorkflowExecutionStartedEventAttributes.SearchAttributes != nil { + res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.IndexedFields) + } + case workflow.EventTypeWorkflowExecutionCompleted: + res += len(event.WorkflowExecutionCompletedEventAttributes.Result) + case workflow.EventTypeWorkflowExecutionFailed: + res += len(event.WorkflowExecutionFailedEventAttributes.Details) + case workflow.EventTypeWorkflowExecutionTimedOut: + case workflow.EventTypeDecisionTaskScheduled: + case workflow.EventTypeDecisionTaskStarted: + case workflow.EventTypeDecisionTaskCompleted: + res += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext) + case workflow.EventTypeDecisionTaskTimedOut: + case workflow.EventTypeDecisionTaskFailed: + res += len(event.DecisionTaskFailedEventAttributes.Details) + case workflow.EventTypeActivityTaskScheduled: + res += len(event.ActivityTaskScheduledEventAttributes.Input) + if event.ActivityTaskScheduledEventAttributes.Header != nil { + res += GetSizeOfMapStringToByteArray(event.ActivityTaskScheduledEventAttributes.Header.Fields) + } + case workflow.EventTypeActivityTaskStarted: + res += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails) + case workflow.EventTypeActivityTaskCompleted: + res += len(event.ActivityTaskCompletedEventAttributes.Result) + case workflow.EventTypeActivityTaskFailed: + res += len(event.ActivityTaskFailedEventAttributes.Details) + case workflow.EventTypeActivityTaskTimedOut: + res += len(event.ActivityTaskTimedOutEventAttributes.Details) + res += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails) + case workflow.EventTypeActivityTaskCancelRequested: + case workflow.EventTypeRequestCancelActivityTaskFailed: + case workflow.EventTypeActivityTaskCanceled: + res += len(event.ActivityTaskCanceledEventAttributes.Details) + case workflow.EventTypeTimerStarted: + case workflow.EventTypeTimerFired: + case workflow.EventTypeCancelTimerFailed: + case workflow.EventTypeTimerCanceled: + case workflow.EventTypeWorkflowExecutionCancelRequested: + case workflow.EventTypeWorkflowExecutionCanceled: + res += len(event.WorkflowExecutionCanceledEventAttributes.Details) + case workflow.EventTypeRequestCancelExternalWorkflowExecutionInitiated: + res += len(event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Control) + case workflow.EventTypeRequestCancelExternalWorkflowExecutionFailed: + res += len(event.RequestCancelExternalWorkflowExecutionFailedEventAttributes.Control) + case workflow.EventTypeExternalWorkflowExecutionCancelRequested: + case workflow.EventTypeMarkerRecorded: + res += len(event.MarkerRecordedEventAttributes.Details) + case workflow.EventTypeWorkflowExecutionSignaled: + res += len(event.WorkflowExecutionSignaledEventAttributes.Input) + case workflow.EventTypeWorkflowExecutionTerminated: + res += len(event.WorkflowExecutionTerminatedEventAttributes.Details) + case workflow.EventTypeWorkflowExecutionContinuedAsNew: + res += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input) + if event.WorkflowExecutionContinuedAsNewEventAttributes.Memo != nil { + res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.Fields) + } + if event.WorkflowExecutionContinuedAsNewEventAttributes.Header != nil { + res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.Fields) + } + if event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes != nil { + res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.IndexedFields) + } + case workflow.EventTypeStartChildWorkflowExecutionInitiated: + res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input) + res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control) + if event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo != nil { + res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.Fields) + } + if event.StartChildWorkflowExecutionInitiatedEventAttributes.Header != nil { + res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.Fields) + } + if event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes != nil { + res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.IndexedFields) + } + case workflow.EventTypeStartChildWorkflowExecutionFailed: + res += len(event.StartChildWorkflowExecutionFailedEventAttributes.Control) + case workflow.EventTypeChildWorkflowExecutionStarted: + if event.ChildWorkflowExecutionStartedEventAttributes == nil { + return 0 + } + if event.ChildWorkflowExecutionStartedEventAttributes.Header != nil { + res += GetSizeOfMapStringToByteArray(event.ChildWorkflowExecutionStartedEventAttributes.Header.Fields) + } + case workflow.EventTypeChildWorkflowExecutionCompleted: + res += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result) + case workflow.EventTypeChildWorkflowExecutionFailed: + res += len(event.ChildWorkflowExecutionFailedEventAttributes.Details) + case workflow.EventTypeChildWorkflowExecutionCanceled: + res += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details) + case workflow.EventTypeChildWorkflowExecutionTimedOut: + case workflow.EventTypeChildWorkflowExecutionTerminated: + case workflow.EventTypeSignalExternalWorkflowExecutionInitiated: + res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input) + res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control) + case workflow.EventTypeSignalExternalWorkflowExecutionFailed: + res += len(event.SignalExternalWorkflowExecutionFailedEventAttributes.Control) + case workflow.EventTypeExternalWorkflowExecutionSignaled: + res += len(event.ExternalWorkflowExecutionSignaledEventAttributes.Control) + case workflow.EventTypeUpsertWorkflowSearchAttributes: + if event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes != nil { + res += GetSizeOfMapStringToByteArray(event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes.IndexedFields) + } + } + return uint64(res) +} + // IsJustOrderByClause return true is query start with order by func IsJustOrderByClause(clause string) bool { whereClause := strings.TrimSpace(clause) diff --git a/service/history/config/config.go b/service/history/config/config.go index 13ff2ae5d24..13b3d54ffc2 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -59,12 +59,13 @@ type Config struct { // EventsCache settings // Change of these configs require shard restart - EventsCacheInitialSize dynamicconfig.IntPropertyFn - EventsCacheMaxSize dynamicconfig.IntPropertyFn - EventsCacheTTL dynamicconfig.DurationPropertyFn - EventsCacheGlobalEnable dynamicconfig.BoolPropertyFn - EventsCacheGlobalInitialSize dynamicconfig.IntPropertyFn - EventsCacheGlobalMaxSize dynamicconfig.IntPropertyFn + EventsCacheInitialCount dynamicconfig.IntPropertyFn + EventsCacheMaxCount dynamicconfig.IntPropertyFn + EventsCacheMaxSize dynamicconfig.IntPropertyFn + EventsCacheTTL dynamicconfig.DurationPropertyFn + EventsCacheGlobalEnable dynamicconfig.BoolPropertyFn + EventsCacheGlobalInitialCount dynamicconfig.IntPropertyFn + EventsCacheGlobalMaxCount dynamicconfig.IntPropertyFn // ShardController settings RangeSizeBits uint @@ -265,12 +266,13 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128), HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512), HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour), - EventsCacheInitialSize: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSize, 128), - EventsCacheMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 512), + EventsCacheInitialCount: dc.GetIntProperty(dynamicconfig.EventsCacheInitialCount, 128), + EventsCacheMaxCount: dc.GetIntProperty(dynamicconfig.EventsCacheMaxCount, 512), + EventsCacheMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 0), EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour), EventsCacheGlobalEnable: dc.GetBoolProperty(dynamicconfig.EventsCacheGlobalEnable, false), - EventsCacheGlobalInitialSize: dc.GetIntProperty(dynamicconfig.EventsCacheInitialSize, 4096), - EventsCacheGlobalMaxSize: dc.GetIntProperty(dynamicconfig.EventsCacheMaxSize, 65536), + EventsCacheGlobalInitialCount: dc.GetIntProperty(dynamicconfig.EventsCacheGlobalInitialCount, 4096), + EventsCacheGlobalMaxCount: dc.GetIntProperty(dynamicconfig.EventsCacheGlobalMaxCount, 131072), RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 1), diff --git a/service/history/events/cache.go b/service/history/events/cache.go index f79fc437263..aa28eb76c9e 100644 --- a/service/history/events/cache.go +++ b/service/history/events/cache.go @@ -87,22 +87,24 @@ var _ Cache = (*cacheImpl)(nil) // NewGlobalCache creates a new global events cache func NewGlobalCache( - initialSize int, - maxSize int, + initialCount int, + maxCount int, ttl time.Duration, historyManager persistence.HistoryManager, logger log.Logger, metricsClient metrics.Client, + maxSize uint64, ) Cache { return newCacheWithOption( - initialSize, - maxSize, + nil, + initialCount, + maxCount, ttl, historyManager, false, logger, metricsClient, - nil, + maxSize, ) } @@ -115,33 +117,43 @@ func NewCache( metricsClient metrics.Client, ) Cache { return newCacheWithOption( - config.EventsCacheInitialSize(), - config.EventsCacheMaxSize(), + &shardID, + config.EventsCacheInitialCount(), + config.EventsCacheMaxCount(), config.EventsCacheTTL(), historyManager, false, logger, metricsClient, - &shardID, + 0, ) } func newCacheWithOption( - initialSize int, - maxSize int, + shardID *int, + initialCount int, + maxCount int, ttl time.Duration, historyManager persistence.HistoryManager, disabled bool, logger log.Logger, metrics metrics.Client, - shardID *int, + maxSize uint64, ) *cacheImpl { opts := &cache.Options{} - opts.InitialCapacity = initialSize + opts.InitialCapacity = initialCount opts.TTL = ttl + opts.MaxCount = maxCount + + if maxSize > 0 { + opts.MaxSize = maxSize + opts.GetCacheItemSizeFunc = func(event interface{}) uint64 { + return common.GetSizeOfHistoryEvent(event.(*shared.HistoryEvent)) + } + } return &cacheImpl{ - Cache: cache.New(maxSize, opts), + Cache: cache.New(opts), historyManager: historyManager, disabled: disabled, logger: logger.WithTags(tag.ComponentEventsCache), diff --git a/service/history/events/cache_test.go b/service/history/events/cache_test.go index 12cfde96edc..d9b9bb348cb 100644 --- a/service/history/events/cache_test.go +++ b/service/history/events/cache_test.go @@ -78,8 +78,8 @@ func (s *eventsCacheSuite) TearDownTest() { } func (s *eventsCacheSuite) newTestEventsCache() *cacheImpl { - return newCacheWithOption(16, 32, time.Minute, s.mockHistoryManager, false, s.logger, - metrics.NewClient(tally.NoopScope, metrics.History), common.IntPtr(10)) + return newCacheWithOption(common.IntPtr(10), 16, 32, time.Minute, s.mockHistoryManager, false, s.logger, + metrics.NewClient(tally.NoopScope, metrics.History), 0) } func (s *eventsCacheSuite) TestEventsCacheHitSuccess() { diff --git a/service/history/execution/cache.go b/service/history/execution/cache.go index c3cc35c457d..65d6ea5d732 100644 --- a/service/history/execution/cache.go +++ b/service/history/execution/cache.go @@ -72,9 +72,10 @@ func NewCache(shard shard.Context) *Cache { opts.InitialCapacity = config.HistoryCacheInitialSize() opts.TTL = config.HistoryCacheTTL() opts.Pin = true + opts.MaxCount = config.HistoryCacheMaxSize() return &Cache{ - Cache: cache.New(config.HistoryCacheMaxSize(), opts), + Cache: cache.New(opts), shard: shard, executionManager: shard.GetExecutionManager(), logger: shard.GetLogger().WithTags(tag.ComponentHistoryCache), diff --git a/service/history/resource/resource.go b/service/history/resource/resource.go index 53e2aea2f47..b733a42efa4 100644 --- a/service/history/resource/resource.go +++ b/service/history/resource/resource.go @@ -66,12 +66,13 @@ func New( impl = &Impl{ Resource: serviceResource, eventCache: events.NewGlobalCache( - config.EventsCacheGlobalInitialSize(), - config.EventsCacheGlobalMaxSize(), + config.EventsCacheGlobalInitialCount(), + config.EventsCacheGlobalMaxCount(), config.EventsCacheTTL(), serviceResource.GetHistoryManager(), params.Logger, params.MetricsClient, + uint64(config.EventsCacheMaxSize()), ), } return impl, nil diff --git a/service/matching/pollerHistory.go b/service/matching/pollerHistory.go index 66d497d9346..cfecfd6ace2 100644 --- a/service/matching/pollerHistory.go +++ b/service/matching/pollerHistory.go @@ -53,10 +53,11 @@ func newPollerHistory() *pollerHistory { InitialCapacity: pollerHistoryInitSize, TTL: pollerHistoryTTL, Pin: false, + MaxCount: pollerHistoryInitMaxSize, } return &pollerHistory{ - history: cache.New(pollerHistoryInitMaxSize, opts), + history: cache.New(opts), } }