Skip to content

Commit

Permalink
add cli flags to customize worqueue rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
ImpSy committed Jan 20, 2023
1 parent 358099a commit bc88547
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 6 deletions.
12 changes: 11 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/golang/glog"
"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -77,6 +78,9 @@ var (
pprofPort = flag.String("pprof-port", "6060", "Port for the pprof endpoint.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
disableExecutorReporting = flag.Bool("disable-executor-reporting", false, "Disable Executors State Reporting in the SparkApplication Custom Resource")
workqueueTokenRefillRate = flag.Int("workqueue-token-refill-rate", 50, "")
workqueueTokenBucketSize = flag.Int("workqueue-token-bucket-size", 500, "")
workqueueMaxDelay = flag.Duration("workqueue-max-delay", rate.InfDuration, "")
metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
)
Expand Down Expand Up @@ -195,8 +199,14 @@ func main() {
util.InitializePProf(*pprofConfig)
}

workqueueRateLimitCfg := util.RatelimitConfig{
QueueTokenRefillRate: *workqueueTokenRefillRate,
QueueTokenBucketSize: *workqueueTokenBucketSize,
MaxDelay: *workqueueMaxDelay,
}

applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService, *disableExecutorReporting)
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService, *disableExecutorReporting, workqueueRateLimitCfg)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/golang/glog"
"github.com/google/uuid"
"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -95,6 +94,7 @@ func NewController(
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool,
disableExecutorReporting bool,
ratelimitCfg util.RatelimitConfig,
) *Controller {
crdscheme.AddToScheme(scheme.Scheme)

Expand All @@ -105,7 +105,7 @@ func NewController(
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "spark-operator"})

return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, ingressClassName, batchSchedulerMgr, enableUIService, disableExecutorReporting)
return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, ingressClassName, batchSchedulerMgr, enableUIService, disableExecutorReporting, ratelimitCfg)
}

func newSparkApplicationController(
Expand All @@ -120,9 +120,9 @@ func newSparkApplicationController(
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool,
disableExecutorReporting bool,
ratelimitCfg util.RatelimitConfig,
) *Controller {
queue := workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(queueTokenRefillRate), queueTokenBucketSize)},
"spark-application-controller")
queue := util.CreateNamedRateLimitingQueue("spark-application-controller", ratelimitCfg)

controller := &Controller{
crdClient: crdClient,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, true, false)
&util.MetricConfig{}, "", "", nil, true, false, util.RatelimitConfig{})

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
Expand Down
28 changes: 28 additions & 0 deletions pkg/util/workqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package util

import (
"time"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

type RatelimitConfig struct {
QueueTokenRefillRate int
QueueTokenBucketSize int
MaxDelay time.Duration
}

func NewRateLimiter(config RatelimitConfig) workqueue.RateLimiter {
ratelimiter := workqueue.NewWithMaxWaitRateLimiter(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(config.QueueTokenRefillRate), config.QueueTokenBucketSize)},
config.MaxDelay,
)
return ratelimiter
}

func CreateNamedRateLimitingQueue(name string, config RatelimitConfig) workqueue.RateLimitingInterface {
rateLimiter := NewRateLimiter(config)
queue := workqueue.NewNamedRateLimitingQueue(rateLimiter, name)
return queue
}

0 comments on commit bc88547

Please sign in to comment.