From 783ade27bec662b885b265354d2470859dc6173f Mon Sep 17 00:00:00 2001 From: Bohdan Storozhuk Date: Sat, 2 Jul 2022 06:59:56 +0100 Subject: [PATCH] Restore int64 based atomic rate limiter (#94) This limiter was introduced and merged in the following PR #85 Later @twelsh-aw found an issue with this implementation #90 So @rabbbit reverted this change in #91 Our tests did not detect this issue, so we have a separate PR #93 that enhances our tests approach to detect potential errors better. With this PR, we want to restore the int64-based atomic rate limiter implementation as a non-default rate limiter and then check that #93 will detect the bug. Right after it, we'll open a subsequent PR to fix this bug. --- limiter_atomic_int64.go | 90 +++++++++++++++++++++++++++++++++++++++++ ratelimit_bench_test.go | 9 +++-- ratelimit_test.go | 12 +++++- tools/go.mod | 1 + 4 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 limiter_atomic_int64.go diff --git a/limiter_atomic_int64.go b/limiter_atomic_int64.go new file mode 100644 index 0000000..6588cd0 --- /dev/null +++ b/limiter_atomic_int64.go @@ -0,0 +1,90 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ratelimit // import "go.uber.org/ratelimit" + +import ( + "time" + + "sync/atomic" +) + +type atomicInt64Limiter struct { + //lint:ignore U1000 Padding is unused but it is crucial to maintain performance + // of this rate limiter in case of collocation with other frequently accessed memory. + prepadding [64]byte // cache line size = 64; created to avoid false sharing. + state int64 // unix nanoseconds of the next permissions issue. + //lint:ignore U1000 like prepadding. + postpadding [56]byte // cache line size - state size = 64 - 8; created to avoid false sharing. + + perRequest time.Duration + maxSlack time.Duration + clock Clock +} + +// newAtomicBased returns a new atomic based limiter. +func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter { + // TODO consider moving config building to the implementation + // independent code. + config := buildConfig(opts) + perRequest := config.per / time.Duration(rate) + l := &atomicInt64Limiter{ + perRequest: perRequest, + maxSlack: time.Duration(config.slack) * perRequest, + clock: config.clock, + } + atomic.StoreInt64(&l.state, 0) + return l +} + +// Take blocks to ensure that the time spent between multiple +// Take calls is on average time.Second/rate. +func (t *atomicInt64Limiter) Take() time.Time { + var ( + newTimeOfNextPermissionIssue int64 + now int64 + ) + for { + now = t.clock.Now().UnixNano() + timeOfNextPermissionIssue := atomic.LoadInt64(&t.state) + + switch { + case timeOfNextPermissionIssue == 0: + // If this is our first request, then we allow it. + newTimeOfNextPermissionIssue = now + case now-timeOfNextPermissionIssue > int64(t.maxSlack): + // a lot of nanoseconds passed since the last Take call + // we will limit max accumulated time to maxSlack + newTimeOfNextPermissionIssue = now - int64(t.maxSlack) + default: + // calculate the time at which our permission was issued + newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest) + } + + if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) { + break + } + } + nanosToSleepUntilOurPermissionIsIssued := newTimeOfNextPermissionIssue - now + if nanosToSleepUntilOurPermissionIsIssued > 0 { + t.clock.Sleep(time.Duration(nanosToSleepUntilOurPermissionIsIssued)) + } + return time.Unix(0, newTimeOfNextPermissionIssue) +} diff --git a/ratelimit_bench_test.go b/ratelimit_bench_test.go index 60b203b..a1125c0 100644 --- a/ratelimit_bench_test.go +++ b/ratelimit_bench_test.go @@ -14,8 +14,9 @@ func BenchmarkRateLimiter(b *testing.B) { for _, procs := range []int{1, 4, 8, 16} { runtime.GOMAXPROCS(procs) for name, limiter := range map[string]Limiter{ - "atomic": New(b.N * 10000000), - "mutex": newMutexBased(b.N * 10000000), + "atomic": newAtomicBased(b.N * 1000000000000), + "atomic_int64": newAtomicInt64Based(b.N * 1000000000000), + "mutex": newMutexBased(b.N * 1000000000000), } { for ng := 1; ng < 16; ng++ { runner(b, name, procs, ng, limiter, count) @@ -47,7 +48,9 @@ func BenchmarkRateLimiter(b *testing.B) { } func runner(b *testing.B, name string, procs int, ng int, limiter Limiter, count *atomic.Int64) bool { - return b.Run(fmt.Sprintf("type:%s-procs:%d-goroutines:%d", name, procs, ng), func(b *testing.B) { + return b.Run(fmt.Sprintf("type:%s;max_procs:%d;goroutines:%d", name, procs, ng), func(b *testing.B) { + b.ReportAllocs() + var wg sync.WaitGroup trigger := atomic.NewBool(true) n := b.N diff --git a/ratelimit_test.go b/ratelimit_test.go index 7b584b5..16f0a19 100644 --- a/ratelimit_test.go +++ b/ratelimit_test.go @@ -54,13 +54,23 @@ func runTest(t *testing.T, fn func(testRunner)) { return newAtomicBased(rate, opts...) }, }, + { + name: "atomic_int64", + constructor: func(rate int, opts ...Option) Limiter { + return newAtomicInt64Based(rate, opts...) + }, + }, } for _, tt := range impls { t.Run(tt.name, func(t *testing.T) { + // Set a non-default time.Time since some limiters (int64 in particular) use + // the default value as "non-initialized" state. + clockMock := clock.NewMock() + clockMock.Set(time.Now()) r := runnerImpl{ t: t, - clock: clock.NewMock(), + clock: clockMock, constructor: tt.constructor, doneCh: make(chan struct{}), } diff --git a/tools/go.mod b/tools/go.mod index ba45a9b..c15076e 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -11,6 +11,7 @@ require ( require ( github.com/BurntSushi/toml v1.0.0 // indirect + github.com/storozhukBM/benchart v1.0.0 golang.org/x/exp/typeparams v0.0.0-20220328175248-053ad81199eb // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f // indirect