From a3cb2713e00cfae0072fa93532faaba6b336e711 Mon Sep 17 00:00:00 2001 From: dongjiang Date: Wed, 25 Jun 2025 10:51:36 +0800 Subject: [PATCH] fix priority queue Signed-off-by: dongjiang --- pkg/controller/priorityqueue/priorityqueue.go | 7 +++++++ .../priorityqueue/priorityqueue_test.go | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index c3f77a6f39..3f18918d4d 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -277,6 +277,13 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.waiters.Add(1) w.notifyItemOrWaiterAdded() + + // ref: https://github.com/kubernetes-sigs/controller-runtime/issues/3239 + if w.shutdown.Load() { + var zero T + return zero, 0, true + } + item := <-w.get return item.Key, item.Priority, w.shutdown.Load() diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index ec0f36e95f..7e5d3ba3ed 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -300,6 +300,26 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) }) + // ref: https://github.com/kubernetes-sigs/controller-runtime/issues/3239 + It("Get from priority queue might get stuck when the priority queue is shut down", func() { + q, _ := newQueue() + + q.Add("baz") + // shut down + q.ShutDown() + q.AddWithOpts(AddOpts{After: time.Second}, "foo") + + item, priority, isShutDown := q.GetWithPriority() + Expect(item).To(Equal("")) + Expect(priority).To(Equal(0)) + Expect(isShutDown).To(BeTrue()) + + item1, priority1, isShutDown := q.GetWithPriority() + Expect(item1).To(Equal("")) + Expect(priority1).To(Equal(0)) + Expect(isShutDown).To(BeTrue()) + }) + It("items are included in Len() and the queueDepth metric once they are ready", func() { q, metrics := newQueue() defer q.ShutDown()