diff --git a/.golangci.yml b/.golangci.yml index 7390d2024b..a8ca869d13 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -158,6 +158,12 @@ linters: # used through an interface and not directly..? # Likely same issue as https://github.com/dominikh/go-tools/issues/1616 path: pkg/controller/priorityqueue/metrics\.go + - linters: + - unused + # Seems to incorrectly trigger on the two implementations that are only + # used through an interface and not directly..? + # Likely same issue as https://github.com/dominikh/go-tools/issues/1616 + path: pkg/controller/priorityqueue/priorityqueue_test\.go # The following are being worked on to remove their exclusion. This list should be reduced or go away all together over time. # If it is decided they will not be addressed they should be moved above this comment. - path: (.+)\.go$ diff --git a/pkg/controller/priorityqueue/fairqueue.go b/pkg/controller/priorityqueue/fairqueue.go new file mode 100644 index 0000000000..e1dc146ff4 --- /dev/null +++ b/pkg/controller/priorityqueue/fairqueue.go @@ -0,0 +1,97 @@ +package priorityqueue + +import ( + "slices" + + "k8s.io/utils/ptr" +) + +type fairq[T comparable] struct { + children map[string]*fairq[T] + items []*item[T] + // order is the order in which to dequeue. Nil is used + // as a magic value to indicate we should dequeue from + // items instead of a child next. + order []*string +} + +func (q *fairq[T]) add(i *item[T], dimensions ...string) { + if len(dimensions) == 0 { + if len(q.order) == len(q.children) { + q.order = append([]*string{nil}, q.order...) + } + q.items = append(q.items, i) + + // Sort here so items that get unlocked can be added, rather + // than having to completely drain and rebuild the fairQueue. + slices.SortFunc(q.items, func(a, b *item[T]) int { + if less(a, b) { + return -1 + } + return 1 + }) + + return + } + + dimension := dimensions[0] + dimensions = dimensions[1:] + + if q.children == nil { + q.children = make(map[string]*fairq[T], 1) + } + + _, exists := q.children[dimension] + if !exists { + q.children[dimension] = &fairq[T]{} + q.order = append([]*string{ptr.To(dimension)}, q.order...) + } + + q.children[dimension].add(i, dimensions...) +} + +func (q *fairq[T]) dequeue() (*item[T], bool) { + var item *item[T] + var hasItem bool + + for idx, dimension := range q.order { + switch { + case dimension != nil: // child element + item, hasItem = q.children[*dimension].dequeue() + if !hasItem { + continue + } + case len(q.items) > 0: // leaf element + item = q.items[0] + q.items = q.items[1:] + default: // no items for current dimension, check next + continue + } + + q.order = append(q.order[:idx], q.order[idx+1:]...) + q.order = append(q.order, dimension) + + return item, true + } + + return item, false +} + +func (q *fairq[T]) drain() { + for _, child := range q.children { + child.drain() + } + q.items = nil +} + +func (q *fairq[T]) isEmpty() bool { + if len(q.items) > 0 { + return false + } + for _, child := range q.children { + if !child.isEmpty() { + return false + } + } + return true +} diff --git a/pkg/controller/priorityqueue/fairqueue_test.go b/pkg/controller/priorityqueue/fairqueue_test.go new file mode 100644 index 0000000000..f9173863c3 --- /dev/null +++ b/pkg/controller/priorityqueue/fairqueue_test.go @@ -0,0 +1,87 @@ +package priorityqueue + +import ( + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("FairQueue", func() { + DescribeTable("Ordering tests", + Entry("simple", + []string{"foo", "foo", "bar"}, + []string{"bar", "foo", "foo"}, + ), + Entry("balanced", + []string{"foo", "foo", "bar", "bar"}, + []string{"bar", "foo", "bar", "foo"}, + ), + Entry("mixed-dimensional, multi-dimensional comes first", + []string{"foo-foo", "foo-foo", "foo"}, + []string{"foo", "foo-foo", "foo-foo"}, + ), + Entry("mixed-dimensional, single dimension comes first", + []string{"foo", "foo-foo", "foo-foo"}, + []string{"foo-foo", "foo", "foo-foo"}, + ), + Entry("complex mixed-dimensional", + []string{"bar", "foo", "foo-foo", "foo-foo", "foo-bar"}, + // First the foo with the latest-added second dimension, then the bar, + // then the remaining foos based on the second dimension. + []string{"foo-bar", "bar", "foo-foo", "foo", "foo-foo"}, + ), + + func(adds []string, expected []string) { + q := &fairq[string]{} + for idx, add := range adds { + q.add(&item[string]{Key: add, AddedCounter: uint64(idx)}, strings.Split(add, "-")...) + } + + actual := make([]string, 0, len(expected)) + for range len(expected) { + item, didGetItem := q.dequeue() + Expect(didGetItem).To(BeTrue()) + actual = append(actual, item.Key) + } + + _, didGetItem := q.dequeue() + Expect(didGetItem).To(BeFalse()) + Expect(actual).To(Equal(expected)) + }, + ) + + It("retains fairness across queue operations", func() { + q := &fairq[string]{} + q.add(&item[string]{Key: "foo"}, "foo") + _, _ = q.dequeue() + q.add(&item[string]{Key: "bar", AddedCounter: 1}, "bar") + q.add(&item[string]{Key: "foo", AddedCounter: 2}, "foo") + + item, _ := q.dequeue() + Expect(item.Key).To(Equal("bar")) + Expect(q.isEmpty()).To(BeFalse()) + }) + + It("sorts by added counter", func() { + q := &fairq[string]{} + q.add(&item[string]{Key: "foo", AddedCounter: 2}) + q.add(&item[string]{Key: "bar", AddedCounter: 1}) + + item, _ := q.dequeue() + Expect(item.Key).To(Equal("bar")) + Expect(q.isEmpty()).To(BeFalse()) + }) + + It("drains all items", func() { + q := &fairq[string]{} + q.add(&item[string]{Key: "foo"}, "foo") + q.add(&item[string]{Key: "bar"}, "bar") + q.add(&item[string]{Key: "baz"}, "baz") + + q.drain() + _, gotItem := q.dequeue() + Expect(gotItem).To(BeFalse()) + Expect(q.isEmpty()).To(BeTrue()) + }) +}) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 1f1a245849..f0af3efbe6 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -38,7 +38,18 @@ type Opts[T comparable] struct { // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. RateLimiter workqueue.TypedRateLimiter[T] MetricProvider workqueue.MetricsProvider - Log logr.Logger + // FairnessDimensionsExtractor can be configured to make the PriorityQueue return items + // with the same priority fairly based on the returned dimensions. This could for + // example be the items Namespace. Doing this ensures that one Namespace with a lot + // of events can not starve other Namespaces. + // + // If more than one dimension is returned, fairness is first ensured within the first + // dimension, then within the second dimension and so on. + // + // If you want the opposite, i.E. explicitly priorize specific events, call AddWithOpts with a + // higher priority. + FairnessDimensionsExtractor func(T) []string + Log logr.Logger } // Opt allows to configure a PriorityQueue. @@ -59,12 +70,18 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { opts.MetricProvider = metrics.WorkqueueMetricsProvider{} } + if opts.FairnessDimensionsExtractor == nil { + opts.FairnessDimensionsExtractor = func(T) []string { return nil } + } + pq := &priorityqueue[T]{ - log: opts.Log, - items: map[T]*item[T]{}, - queue: btree.NewG(32, less[T]), - becameReady: sets.Set[T]{}, - metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), + log: opts.Log, + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + fairQueue: &fairq[T]{}, + extractFairnessDimensions: opts.FairnessDimensionsExtractor, + becameReady: sets.Set[T]{}, + metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -89,19 +106,34 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { type priorityqueue[T comparable] struct { log logr.Logger - // lock has to be acquired for any access any of items, queue, addedCounter - // or becameReady + // lock has to be acquired before accessing any of items, + // queue, fairQueue, fairQueue, addedCounter, becameReady or + // locked. lock sync.Mutex items map[T]*item[T] queue bTree[*item[T]] + // fairQueue ensures that the items we hand out are far across + // configurable dimensions, for example the namespace. + // It holds all items that are ready to be handed out and in the + // highest priority bracket that has ready items, as well as state + // to determine the fairness. + fairQueue fairQueueInterface[T] + + // fairQueuePriority is the priority of the items in the fairQueue. + // it may be nil, indicating the fairQueuePriority is currently + // unknown. + fairQueuePriority *int + + extractFairnessDimensions func(item T) []string + // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. addedCounter uint64 // becameReady holds items that are in the queue, were added - // with non-zero after and became ready. We need it to call the - // metrics add exactly once for them. + // with non-zero after and became ready. We need it to call + // metrics.add and fairQueue.add exactly once for them. becameReady sets.Set[T] metrics queueMetrics[T] @@ -111,8 +143,7 @@ type priorityqueue[T comparable] struct { // locked contains the keys we handed out through Get() and that haven't // yet been returned through Done(). - locked sets.Set[T] - lockedLock sync.RWMutex + locked sets.Set[T] shutdown atomic.Bool done chan struct{} @@ -150,6 +181,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { readyAt = ptr.To(w.now().Add(after)) w.metrics.retry() } + + if readyAt == nil && !w.locked.Has(key) && w.fairQueuePriority != nil && *w.fairQueuePriority < o.Priority { + w.fairQueue.drain() + w.fairQueuePriority = nil // Leave nil here in case something at that priority becomes ready in parallel + } + if _, ok := w.items[key]; !ok { item := &item[T]{ Key: key, @@ -161,6 +198,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.queue.ReplaceOrInsert(item) if item.ReadyAt == nil { w.metrics.add(key, item.Priority) + w.maybeAddToFairQueueLocked(item) } w.addedCounter++ continue @@ -175,11 +213,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) } item.Priority = o.Priority + + w.maybeAddToFairQueueLocked(item) } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { if readyAt == nil && !w.becameReady.Has(key) { w.metrics.add(key, item.Priority) + w.maybeAddToFairQueueLocked(item) } item.ReadyAt = readyAt } @@ -218,12 +259,8 @@ func (w *priorityqueue[T]) spin() { w.lock.Lock() defer w.lock.Unlock() - w.lockedLock.Lock() - defer w.lockedLock.Unlock() - - // manipulating the tree from within Ascend might lead to panics, so - // track what we want to delete and do it after we are done ascending. - var toDelete []*item[T] + fairQueuePopulating: + fairQueueNeedsPopulating := w.fairQueuePriority == nil w.queue.Ascend(func(item *item[T]) bool { if item.ReadyAt != nil { if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { @@ -233,38 +270,70 @@ func (w *priorityqueue[T]) spin() { if !w.becameReady.Has(item.Key) { w.metrics.add(item.Key, item.Priority) w.becameReady.Insert(item.Key) + if !fairQueueNeedsPopulating { + w.maybeAddToFairQueueLocked(item) + } } } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } - // Item is locked, we can not hand it out if w.locked.Has(item.Key) { return true } + if w.fairQueuePriority == nil { + w.fairQueuePriority = ptr.To(item.Priority) + } + + if fairQueueNeedsPopulating { + w.maybeAddToFairQueueLocked(item) + } + + return true + }) + + if w.fairQueue.isEmpty() { + return + } + + for w.waiters.Load() > 0 { + item, hasItem := w.fairQueue.dequeue() + if !hasItem { + w.fairQueuePriority = nil + if w.waiters.Load() > 0 { + // There could be ready items that have a lower priority + goto fairQueuePopulating + } + break + } + w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) w.waiters.Add(-1) delete(w.items, item.Key) - toDelete = append(toDelete, item) w.becameReady.Delete(item.Key) w.get <- *item - - return true - }) - - for _, item := range toDelete { w.queue.Delete(item) + + if w.fairQueue.isEmpty() { + w.fairQueuePriority = nil + } } }() } } +func (w *priorityqueue[T]) maybeAddToFairQueueLocked(item *item[T]) { + if w.fairQueuePriority == nil || + *w.fairQueuePriority != item.Priority || + (item.ReadyAt != nil && item.ReadyAt.Sub(w.now()) > 0) || + w.locked.Has(item.Key) { + return + } + + w.fairQueue.add(item, w.extractFairnessDimensions(item.Key)...) +} + func (w *priorityqueue[T]) Add(item T) { w.AddWithOpts(AddOpts{}, item) } @@ -310,10 +379,22 @@ func (w *priorityqueue[T]) ShuttingDown() bool { } func (w *priorityqueue[T]) Done(item T) { - w.lockedLock.Lock() - defer w.lockedLock.Unlock() + w.lock.Lock() + defer w.lock.Unlock() w.locked.Delete(item) w.metrics.done(item) + + // Update the fairqueue if the item is waiting + if w.fairQueuePriority != nil && w.items[item] != nil && (w.items[item].ReadyAt == nil || w.items[item].ReadyAt.Sub(w.now()) <= 0) { + if *w.fairQueuePriority == w.items[item].Priority { + // We can just insert as the fairQueue sorts by addedCounter + w.fairQueue.add(w.items[item], w.extractFairnessDimensions(item)...) + } else if *w.fairQueuePriority < w.items[item].Priority { + w.fairQueue.drain() + w.fairQueuePriority = ptr.To(w.items[item].Priority) + w.fairQueue.add(w.items[item], w.extractFairnessDimensions(item)...) + } + } w.notifyItemOrWaiterAdded() } @@ -415,3 +496,10 @@ type bTree[T any] interface { Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) } + +type fairQueueInterface[T comparable] interface { + add(i *item[T], dimensions ...string) + dequeue() (*item[T], bool) + drain() + isEmpty() bool +} diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 7e5d3ba3ed..449125ea7a 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -3,6 +3,7 @@ package priorityqueue import ( "fmt" "math/rand/v2" + "strings" "sync" "testing" "time" @@ -85,6 +86,31 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.adds["test"]).To(Equal(4)) }) + It("returns items fairly when configured to", func() { + q, metrics := newQueue(func(o *Opts[string]) { + o.FairnessDimensionsExtractor = func(s string) []string { + return []string{strings.Split(s, "-")[0]} + } + }) + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo-1") + q.AddWithOpts(AddOpts{}, "foo-2") + q.AddWithOpts(AddOpts{}, "bar-1") + + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("bar-1")) + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("foo-1")) + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("foo-2")) + + cwq := q.(*priorityqueue[string]) + cwq.lock.Lock() + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + Expect(metrics.adds["test"]).To(Equal(3)) + }) + It("de-duplicates items", func() { q, metrics := newQueue() defer q.ShutDown() @@ -95,7 +121,7 @@ var _ = Describe("Controllerworkqueue", func() { Consistently(q.Len).Should(Equal(1)) cwq := q.(*priorityqueue[string]) - cwq.lockedLock.Lock() + cwq.lock.Lock() Expect(cwq.locked.Len()).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) @@ -196,6 +222,71 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns items from multiple priorities to waiting waiters once they are ready", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + return tick + } + + retrievedFoo := make(chan struct{}) + retrievedBar := make(chan struct{}) + + go func() { + defer GinkgoRecover() + item, _, _ := q.GetWithPriority() + switch item { + case "foo": + close(retrievedFoo) + case "bar": + close(retrievedBar) + default: + panic(fmt.Sprintf("unexpected item %s", item)) + } + }() + go func() { + defer GinkgoRecover() + item, _, _ := q.GetWithPriority() + switch item { + case "foo": + close(retrievedFoo) + case "bar": + close(retrievedBar) + default: + panic(fmt.Sprintf("unexpected item %s", item)) + } + }() + + q.AddWithOpts(AddOpts{After: time.Second, Priority: 1}, "foo") + q.AddWithOpts(AddOpts{After: time.Second, Priority: 2}, "bar") + + Consistently(retrievedFoo).ShouldNot(BeClosed()) + Consistently(retrievedBar).ShouldNot(BeClosed()) + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + Eventually(retrievedBar).Should(BeClosed()) + Eventually(retrievedFoo).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(map[int]int{1: 0, 2: 0})) + Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.retries["test"]).To(Equal(2)) + }) + It("returns an item to a waiter as soon as it has one", func() { q, metrics := newQueue() defer q.ShutDown() @@ -691,14 +782,18 @@ func TestFuzzPriorityQueue(t *testing.T) { wg.Wait() } -func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { +func newQueue(opts ...Opt[string]) (PriorityQueue[string], *fakeMetricsProvider) { metrics := newFakeMetricsProvider() - q := New("test", func(o *Opts[string]) { + q := New("test", append(opts, func(o *Opts[string]) { o.MetricProvider = metrics - }) + })...) q.(*priorityqueue[string]).queue = &btreeInteractionValidator{ bTree: q.(*priorityqueue[string]).queue, } + q.(*priorityqueue[string]).fairQueue = &fairQueueInteractionValidator[string]{ + fairQueue: q.(*priorityqueue[string]).fairQueue, + pq: q.(*priorityqueue[string]), + } // validate that tick always gets a positive value as it will just return // nil otherwise, which results in blocking forever. @@ -720,16 +815,80 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s // There is no codepath that updates an item item, alreadyExist := b.bTree.ReplaceOrInsert(item) if alreadyExist { + defer GinkgoRecover() panic(fmt.Sprintf("ReplaceOrInsert: item %v already existed", item)) } return item, alreadyExist } -func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) { +func (b *btreeInteractionValidator) Delete(element *item[string]) (*item[string], bool) { // There is no codepath that deletes an item that doesn't exist - old, existed := b.bTree.Delete(item) + old, existed := b.bTree.Delete(element) if !existed { - panic(fmt.Sprintf("Delete: item %v not found", item)) + var items []item[string] + b.bTree.Ascend(func(item *item[string]) bool { + items = append(items, *item) + return true + }) + defer GinkgoRecover() + panic(fmt.Sprintf("Delete: item %v not found, items: %v", element, items)) } return old, existed } + +type fairQueueInteractionValidator[T comparable] struct { + fairQueue fairQueueInterface[T] + pq *priorityqueue[T] + allKeys sets.Set[T] +} + +func (f *fairQueueInteractionValidator[T]) add(item *item[T], dimensions ...string) { + if f.pq.fairQueuePriority == nil { + defer GinkgoRecover() + panic(fmt.Sprintf("add: fairQueuePriority is nil, item: %v", item)) + } + + if item.Priority != *f.pq.fairQueuePriority { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item priority %d does not match fair queue priority %d", item.Priority, *f.pq.fairQueuePriority)) + } + + if f.pq.locked.Has(item.Key) { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item %v is locked but got added to fair queue", item.Key)) + } + + if item.ReadyAt != nil && item.ReadyAt.After(f.pq.now()) { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item %v has ReadyAt %v in the future", item.Key, item.ReadyAt)) + } + + if f.allKeys == nil { + f.allKeys = sets.New[T]() + } + if f.allKeys.Has(item.Key) { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item with key %v already exists in fair queue", item.Key)) + } + f.allKeys.Insert(item.Key) + + f.fairQueue.add(item, dimensions...) +} + +func (f *fairQueueInteractionValidator[T]) dequeue() (*item[T], bool) { + item, exists := f.fairQueue.dequeue() + if exists { + f.allKeys.Delete(item.Key) + } + + return item, exists +} + +func (f *fairQueueInteractionValidator[T]) drain() { + f.allKeys = nil + f.fairQueue.drain() +} + +func (f *fairQueueInteractionValidator[T]) isEmpty() bool { + return f.fairQueue.isEmpty() +}