Skip to content

Commit

Permalink
[perf]: More precise producer rate limiter (#989)
Browse files Browse the repository at this point in the history
* perf: more accurate producer rate limiter

* perf: more accurate producer rate limiter
  • Loading branch information
Gleiphir2769 authored Mar 13, 2023
1 parent 352c463 commit bcbac9f
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
go.uber.org/atomic v1.7.0
golang.org/x/mod v0.5.1
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/protobuf v1.26.0
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
17 changes: 6 additions & 11 deletions perf/perf-producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"encoding/json"
"time"

"golang.org/x/time/rate"

"github.com/bmizerany/perks/quantile"
"github.com/spf13/cobra"

Expand Down Expand Up @@ -104,16 +106,9 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
payload := make([]byte, produceArgs.MessageSize)

ch := make(chan float64)
rateLimitCh := make(chan time.Time, produceArgs.Rate)
go func(rateLimit int, interval time.Duration) {
if rateLimit <= 0 { // 0 as no limit enforced
return
}
for {
oldest := <-rateLimitCh
time.Sleep(interval - time.Since(oldest))
}
}(produceArgs.Rate, time.Second)

limit := rate.Every(time.Duration(float64(time.Second) / float64(produceArgs.Rate)))
rateLimiter := rate.NewLimiter(limit, produceArgs.Rate)

go func(stopCh <-chan struct{}) {
for {
Expand All @@ -125,7 +120,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {

start := time.Now()
if produceArgs.Rate > 0 {
rateLimitCh <- start
_ = rateLimiter.Wait(context.TODO())
}

producer.SendAsync(ctx, &pulsar.ProducerMessage{
Expand Down

0 comments on commit bcbac9f

Please sign in to comment.