diff --git a/examples/priorityqueue/main.go b/examples/priorityqueue/main.go index d6b25f6419..2b09432f22 100644 --- a/examples/priorityqueue/main.go +++ b/examples/priorityqueue/main.go @@ -23,6 +23,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/config" @@ -48,7 +49,7 @@ func run() error { // Setup a Manager mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{ - Controller: config.Controller{UsePriorityQueue: true}, + Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, }) if err != nil { return fmt.Errorf("failed to set up controller-manager: %w", err) diff --git a/pkg/config/controller.go b/pkg/config/controller.go index b702f2838d..0b2aa0cb7b 100644 --- a/pkg/config/controller.go +++ b/pkg/config/controller.go @@ -58,5 +58,5 @@ type Controller struct { // priority queue. // // Note: This flag is disabled by default until a future version. It's currently in beta. - UsePriorityQueue bool + UsePriorityQueue *bool } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 47f8aecd1c..b7d7286033 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/internal/controller" @@ -190,7 +191,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.RateLimiter == nil { - if mgr.GetControllerOptions().UsePriorityQueue { + if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second) } else { options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() @@ -199,7 +200,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt if options.NewQueue == nil { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - if mgr.GetControllerOptions().UsePriorityQueue { + if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { o.RateLimiter = rateLimiter }) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 02fbf27dc2..1c5b11d709 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -441,7 +441,7 @@ var _ = Describe("controller.Controller", func() { It("should configure a priority queue if UsePriorityQueue is set", func() { m, err := manager.New(cfg, manager.Options{ - Controller: config.Controller{UsePriorityQueue: true}, + Controller: config.Controller{UsePriorityQueue: ptr.To(true)}, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index bfb31ffc1e..f3ac226eea 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -66,6 +66,7 @@ type defaultQueueMetrics[T comparable] struct { retries workqueue.CounterMetric } +// add is called for ready items only func (m *defaultQueueMetrics[T]) add(item T) { if m == nil { return diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 8f9adf2629..a2e80d3065 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } pq := &priorityqueue[T]{ - items: map[T]*item[T]{}, - queue: btree.NewG(32, less[T]), - metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + becameReady: sets.Set[T]{}, + metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { type priorityqueue[T comparable] struct { // lock has to be acquired for any access any of items, queue, addedCounter - // or metrics. - lock sync.Mutex - items map[T]*item[T] - queue bTree[*item[T]] - metrics queueMetrics[T] + // or becameReady + lock sync.Mutex + items map[T]*item[T] + queue bTree[*item[T]] // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. addedCounter uint64 + // becameReady holds items that are in the queue, were added + // with non-zero after and became ready. We need it to call the + // metrics add exactly once for them. + becameReady sets.Set[T] + metrics queueMetrics[T] + itemOrWaiterAdded chan struct{} rateLimiter workqueue.TypedRateLimiter[T] @@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } w.items[key] = item w.queue.ReplaceOrInsert(item) - w.metrics.add(key) + if item.readyAt == nil { + w.metrics.add(key) + } w.addedCounter++ continue } @@ -195,19 +203,25 @@ func (w *priorityqueue[T]) spin() { w.lockedLock.Lock() defer w.lockedLock.Unlock() + // manipulating the tree from within Ascend might lead to panics, so + // track what we want to delete and do it after we are done ascending. + var toDelete []*item[T] w.queue.Ascend(func(item *item[T]) bool { - if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways - return false + if item.readyAt != nil { + if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 { + nextReady = w.tick(readyAt) + return false + } + if !w.becameReady.Has(item.key) { + w.metrics.add(item.key) + w.becameReady.Insert(item.key) + } } - // No next element we can process - if item.readyAt != nil && item.readyAt.After(w.now()) { - readyAt := item.readyAt.Sub(w.now()) - if readyAt <= 0 { // Toctou race with the above check - readyAt = 1 - } - nextReady = w.tick(readyAt) - return false + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true } // Item is locked, we can not hand it out @@ -219,11 +233,16 @@ func (w *priorityqueue[T]) spin() { w.locked.Insert(item.key) w.waiters.Add(-1) delete(w.items, item.key) - w.queue.Delete(item) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.key) w.get <- *item return true }) + + for _, item := range toDelete { + w.queue.Delete(item) + } }() } } @@ -279,22 +298,36 @@ func (w *priorityqueue[T]) ShutDown() { close(w.done) } +// ShutDownWithDrain just calls ShutDown, as the draining +// functionality is not used by controller-runtime. func (w *priorityqueue[T]) ShutDownWithDrain() { w.ShutDown() } +// Len returns the number of items that are ready to be +// picked up. It does not include items that are not yet +// ready. func (w *priorityqueue[T]) Len() int { w.lock.Lock() defer w.lock.Unlock() - return w.queue.Len() + var result int + w.queue.Ascend(func(item *item[T]) bool { + if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 { + result++ + return true + } + return false + }) + + return result } func less[T comparable](a, b *item[T]) bool { if a.readyAt == nil && b.readyAt != nil { return true } - if a.readyAt != nil && b.readyAt == nil { + if b.readyAt == nil && a.readyAt != nil { return false } if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) { @@ -329,5 +362,4 @@ type bTree[T any] interface { ReplaceOrInsert(item T) (_ T, _ bool) Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) - Len() int } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 13bd5fc8d3..0e201a3986 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -2,6 +2,7 @@ package priorityqueue import ( "fmt" + "math/rand/v2" "sync" "testing" "time" @@ -283,6 +284,101 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(0)) Expect(metrics.adds["test"]).To(Equal(2)) }) + + It("doesn't include non-ready items in Len()", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: time.Minute}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: time.Minute}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + + Expect(q.Len()).To(Equal(2)) + Expect(metrics.depth).To(HaveLen(1)) + Expect(metrics.depth["test"]).To(Equal(2)) + }) + + It("items are included in Len() and the queueDepth metric once they are ready", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + + Expect(q.Len()).To(Equal(2)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(2)) + metrics.mu.Unlock() + time.Sleep(time.Second) + Expect(q.Len()).To(Equal(4)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(4)) + metrics.mu.Unlock() + + // Drain queue + for range 4 { + item, _ := q.Get() + q.Done(item) + } + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() + + // Validate that doing it again still works to notice bugs with removing + // it from the queues becameReady tracking. + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo") + q.AddWithOpts(AddOpts{}, "baz") + q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar") + q.AddWithOpts(AddOpts{}, "bal") + + Expect(q.Len()).To(Equal(2)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(2)) + metrics.mu.Unlock() + time.Sleep(time.Second) + Expect(q.Len()).To(Equal(4)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(4)) + metrics.mu.Unlock() + }) + + It("returns many items", func() { + // This test ensures the queue is able to drain a large queue without panic'ing. + // In a previous version of the code we were calling queue.Delete within q.Ascend + // which led to a panic in queue.Ascend > iterate: + // "panic: runtime error: index out of range [0] with length 0" + q, _ := newQueue() + defer q.ShutDown() + + for range 20 { + for i := range 1000 { + rn := rand.N(100) //nolint:gosec // We don't need cryptographically secure entropy here + if rn < 10 { + q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i)) + } else { + q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i)) + } + } + + wg := sync.WaitGroup{} + for range 100 { // The panic only occurred relatively frequently with a high number of go routines. + wg.Add(1) + go func() { + defer wg.Done() + for range 10 { + obj, _, _ := q.GetWithPriority() + q.Done(obj) + } + }() + } + + wg.Wait() + } + }) }) func BenchmarkAddGetDone(b *testing.B) { @@ -438,10 +534,6 @@ func TestFuzzPrioriorityQueue(t *testing.T) { } wg.Wait() - - if expected := len(inQueue); expected != q.Len() { - t.Errorf("Expected queue length to be %d, was %d", expected, q.Len()) - } } func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { @@ -453,6 +545,8 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { bTree: q.(*priorityqueue[string]).queue, } + // validate that tick always gets a positive value as it will just return + // nil otherwise, which results in blocking forever. upstreamTick := q.(*priorityqueue[string]).tick q.(*priorityqueue[string]).tick = func(d time.Duration) <-chan time.Time { if d <= 0 { @@ -477,7 +571,7 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s } func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) { - // There is node codepath that deletes an item that doesn't exist + // There is no codepath that deletes an item that doesn't exist old, existed := b.bTree.Delete(item) if !existed { panic(fmt.Sprintf("Delete: item %v not found", item)) diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 9fd912f882..57107f20e9 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -194,8 +194,8 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) { } // isObjectUnchanged checks if the object in a create event is unchanged, for example because -// we got it in our initial listwatch or because of a resync. The heuristic it uses is to check -// if the object is older than one minute. +// we got it in our initial listwatch. The heuristic it uses is to check if the object is older +// than one minute. func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) } diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 5679d9dffe..6e57c22c3b 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -776,7 +776,7 @@ var _ = Describe("Eventhandler", func() { }) Describe("WithLowPriorityWhenUnchanged", func() { - It("should lower the priority of a create request for an object that was crated more than one minute in the past", func() { + It("should lower the priority of a create request for an object that was created more than one minute in the past", func() { actualOpts := priorityqueue.AddOpts{} var actualRequests []reconcile.Request wq := &fakePriorityQueue{ @@ -797,7 +797,7 @@ var _ = Describe("Eventhandler", func() { Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) - It("should not lower the priority of a create request for an object that was crated less than one minute in the past", func() { + It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() { actualOpts := priorityqueue.AddOpts{} var actualRequests []reconcile.Request wq := &fakePriorityQueue{