From a5e0a2c3d761d029c2337ff23fe78fea71f52989 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Thu, 17 Jul 2025 11:53:26 +0200 Subject: [PATCH] :sparkles: Priorityqueue: Optionally return items within a priority fairly This change makes it possible for the priorityqueue to return items fairly across one or more fairness dimensions. These fairness dimensions are extracted from items using a configurable extractor func. Fairness is only ensured within a given priority, as the purpose of priorities is to return higher-priority items before lower-priority items. An example use-case is to use the namespace as fairness dimension, which then ensures that one very busy namespace can not starve off other namespaces. An example use-case for more than one dimension are multi-cluster controllers, where fairness should first be achieved across clusters and then across namespaces within each cluster. By default, no extractor func is configured so this doesn't change any behavior. We can revisit this in the future. Naively, the whole problem looks like it could be solved by sorting the items in the queue appropriately, but that doesn't really work because: * Most (all?) sorting algorithm assume that the order of two elements can be determined just by looking at those two elements, but for fairness we have to consider the items in front of the current elements as well as the items that we already handed out * We can not hand out locked items and have to ignore them for the purpose of fairness. This in turn means that whenever an item gets unlocked, we'd have to redo our sorting This change solves the problem by introducing a `fairQueue` structure which holds all items that are ready to be handed out and in the highest priority bracket that has ready items. The priorityqueue then hands out items from its internal `fairQueue` and manages its content. The `fairQueue` works by using a slice of all the fairness dinensions it has ever observed and a map of fairness dimension to item slice. Whenever it hands out an item, it iterates over this slice and checks the corresponding map for items until it found an item to hand out. --- .golangci.yml | 6 + pkg/controller/priorityqueue/fairqueue.go | 97 ++++++++++ .../priorityqueue/fairqueue_test.go | 87 +++++++++ pkg/controller/priorityqueue/priorityqueue.go | 152 +++++++++++---- .../priorityqueue/priorityqueue_test.go | 173 +++++++++++++++++- 5 files changed, 476 insertions(+), 39 deletions(-) create mode 100644 pkg/controller/priorityqueue/fairqueue.go create mode 100644 pkg/controller/priorityqueue/fairqueue_test.go diff --git a/.golangci.yml b/.golangci.yml index 7390d2024b..a8ca869d13 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -158,6 +158,12 @@ linters: # used through an interface and not directly..? # Likely same issue as https://github.com/dominikh/go-tools/issues/1616 path: pkg/controller/priorityqueue/metrics\.go + - linters: + - unused + # Seems to incorrectly trigger on the two implementations that are only + # used through an interface and not directly..? + # Likely same issue as https://github.com/dominikh/go-tools/issues/1616 + path: pkg/controller/priorityqueue/priorityqueue_test\.go # The following are being worked on to remove their exclusion. This list should be reduced or go away all together over time. # If it is decided they will not be addressed they should be moved above this comment. - path: (.+)\.go$ diff --git a/pkg/controller/priorityqueue/fairqueue.go b/pkg/controller/priorityqueue/fairqueue.go new file mode 100644 index 0000000000..e1dc146ff4 --- /dev/null +++ b/pkg/controller/priorityqueue/fairqueue.go @@ -0,0 +1,97 @@ +package priorityqueue + +import ( + "slices" + + "k8s.io/utils/ptr" +) + +type fairq[T comparable] struct { + children map[string]*fairq[T] + items []*item[T] + // order is the order in which to dequeue. Nil is used + // as a magic value to indicate we should dequeue from + // items instead of a child next. + order []*string +} + +func (q *fairq[T]) add(i *item[T], dimensions ...string) { + if len(dimensions) == 0 { + if len(q.order) == len(q.children) { + q.order = append([]*string{nil}, q.order...) + } + q.items = append(q.items, i) + + // Sort here so items that get unlocked can be added, rather + // than having to completely drain and rebuild the fairQueue. + slices.SortFunc(q.items, func(a, b *item[T]) int { + if less(a, b) { + return -1 + } + return 1 + }) + + return + } + + dimension := dimensions[0] + dimensions = dimensions[1:] + + if q.children == nil { + q.children = make(map[string]*fairq[T], 1) + } + + _, exists := q.children[dimension] + if !exists { + q.children[dimension] = &fairq[T]{} + q.order = append([]*string{ptr.To(dimension)}, q.order...) + } + + q.children[dimension].add(i, dimensions...) +} + +func (q *fairq[T]) dequeue() (*item[T], bool) { + var item *item[T] + var hasItem bool + + for idx, dimension := range q.order { + switch { + case dimension != nil: // child element + item, hasItem = q.children[*dimension].dequeue() + if !hasItem { + continue + } + case len(q.items) > 0: // leaf element + item = q.items[0] + q.items = q.items[1:] + default: // no items for current dimension, check next + continue + } + + q.order = append(q.order[:idx], q.order[idx+1:]...) + q.order = append(q.order, dimension) + + return item, true + } + + return item, false +} + +func (q *fairq[T]) drain() { + for _, child := range q.children { + child.drain() + } + q.items = nil +} + +func (q *fairq[T]) isEmpty() bool { + if len(q.items) > 0 { + return false + } + for _, child := range q.children { + if !child.isEmpty() { + return false + } + } + return true +} diff --git a/pkg/controller/priorityqueue/fairqueue_test.go b/pkg/controller/priorityqueue/fairqueue_test.go new file mode 100644 index 0000000000..f9173863c3 --- /dev/null +++ b/pkg/controller/priorityqueue/fairqueue_test.go @@ -0,0 +1,87 @@ +package priorityqueue + +import ( + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("FairQueue", func() { + DescribeTable("Ordering tests", + Entry("simple", + []string{"foo", "foo", "bar"}, + []string{"bar", "foo", "foo"}, + ), + Entry("balanced", + []string{"foo", "foo", "bar", "bar"}, + []string{"bar", "foo", "bar", "foo"}, + ), + Entry("mixed-dimensional, multi-dimensional comes first", + []string{"foo-foo", "foo-foo", "foo"}, + []string{"foo", "foo-foo", "foo-foo"}, + ), + Entry("mixed-dimensional, single dimension comes first", + []string{"foo", "foo-foo", "foo-foo"}, + []string{"foo-foo", "foo", "foo-foo"}, + ), + Entry("complex mixed-dimensional", + []string{"bar", "foo", "foo-foo", "foo-foo", "foo-bar"}, + // First the foo with the latest-added second dimension, then the bar, + // then the remaining foos based on the second dimension. + []string{"foo-bar", "bar", "foo-foo", "foo", "foo-foo"}, + ), + + func(adds []string, expected []string) { + q := &fairq[string]{} + for idx, add := range adds { + q.add(&item[string]{Key: add, AddedCounter: uint64(idx)}, strings.Split(add, "-")...) + } + + actual := make([]string, 0, len(expected)) + for range len(expected) { + item, didGetItem := q.dequeue() + Expect(didGetItem).To(BeTrue()) + actual = append(actual, item.Key) + } + + _, didGetItem := q.dequeue() + Expect(didGetItem).To(BeFalse()) + Expect(actual).To(Equal(expected)) + }, + ) + + It("retains fairness across queue operations", func() { + q := &fairq[string]{} + q.add(&item[string]{Key: "foo"}, "foo") + _, _ = q.dequeue() + q.add(&item[string]{Key: "bar", AddedCounter: 1}, "bar") + q.add(&item[string]{Key: "foo", AddedCounter: 2}, "foo") + + item, _ := q.dequeue() + Expect(item.Key).To(Equal("bar")) + Expect(q.isEmpty()).To(BeFalse()) + }) + + It("sorts by added counter", func() { + q := &fairq[string]{} + q.add(&item[string]{Key: "foo", AddedCounter: 2}) + q.add(&item[string]{Key: "bar", AddedCounter: 1}) + + item, _ := q.dequeue() + Expect(item.Key).To(Equal("bar")) + Expect(q.isEmpty()).To(BeFalse()) + }) + + It("drains all items", func() { + q := &fairq[string]{} + q.add(&item[string]{Key: "foo"}, "foo") + q.add(&item[string]{Key: "bar"}, "bar") + q.add(&item[string]{Key: "baz"}, "baz") + + q.drain() + _, gotItem := q.dequeue() + Expect(gotItem).To(BeFalse()) + Expect(q.isEmpty()).To(BeTrue()) + }) +}) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 1f1a245849..f0af3efbe6 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -38,7 +38,18 @@ type Opts[T comparable] struct { // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. RateLimiter workqueue.TypedRateLimiter[T] MetricProvider workqueue.MetricsProvider - Log logr.Logger + // FairnessDimensionsExtractor can be configured to make the PriorityQueue return items + // with the same priority fairly based on the returned dimensions. This could for + // example be the items Namespace. Doing this ensures that one Namespace with a lot + // of events can not starve other Namespaces. + // + // If more than one dimension is returned, fairness is first ensured within the first + // dimension, then within the second dimension and so on. + // + // If you want the opposite, i.E. explicitly priorize specific events, call AddWithOpts with a + // higher priority. + FairnessDimensionsExtractor func(T) []string + Log logr.Logger } // Opt allows to configure a PriorityQueue. @@ -59,12 +70,18 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { opts.MetricProvider = metrics.WorkqueueMetricsProvider{} } + if opts.FairnessDimensionsExtractor == nil { + opts.FairnessDimensionsExtractor = func(T) []string { return nil } + } + pq := &priorityqueue[T]{ - log: opts.Log, - items: map[T]*item[T]{}, - queue: btree.NewG(32, less[T]), - becameReady: sets.Set[T]{}, - metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), + log: opts.Log, + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + fairQueue: &fairq[T]{}, + extractFairnessDimensions: opts.FairnessDimensionsExtractor, + becameReady: sets.Set[T]{}, + metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), // itemOrWaiterAdded indicates that an item or // waiter was added. It must be buffered, because // if we currently process items we can't tell @@ -89,19 +106,34 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { type priorityqueue[T comparable] struct { log logr.Logger - // lock has to be acquired for any access any of items, queue, addedCounter - // or becameReady + // lock has to be acquired before accessing any of items, + // queue, fairQueue, fairQueue, addedCounter, becameReady or + // locked. lock sync.Mutex items map[T]*item[T] queue bTree[*item[T]] + // fairQueue ensures that the items we hand out are far across + // configurable dimensions, for example the namespace. + // It holds all items that are ready to be handed out and in the + // highest priority bracket that has ready items, as well as state + // to determine the fairness. + fairQueue fairQueueInterface[T] + + // fairQueuePriority is the priority of the items in the fairQueue. + // it may be nil, indicating the fairQueuePriority is currently + // unknown. + fairQueuePriority *int + + extractFairnessDimensions func(item T) []string + // addedCounter is a counter of elements added, we need it // because unixNano is not guaranteed to be unique. addedCounter uint64 // becameReady holds items that are in the queue, were added - // with non-zero after and became ready. We need it to call the - // metrics add exactly once for them. + // with non-zero after and became ready. We need it to call + // metrics.add and fairQueue.add exactly once for them. becameReady sets.Set[T] metrics queueMetrics[T] @@ -111,8 +143,7 @@ type priorityqueue[T comparable] struct { // locked contains the keys we handed out through Get() and that haven't // yet been returned through Done(). - locked sets.Set[T] - lockedLock sync.RWMutex + locked sets.Set[T] shutdown atomic.Bool done chan struct{} @@ -150,6 +181,12 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { readyAt = ptr.To(w.now().Add(after)) w.metrics.retry() } + + if readyAt == nil && !w.locked.Has(key) && w.fairQueuePriority != nil && *w.fairQueuePriority < o.Priority { + w.fairQueue.drain() + w.fairQueuePriority = nil // Leave nil here in case something at that priority becomes ready in parallel + } + if _, ok := w.items[key]; !ok { item := &item[T]{ Key: key, @@ -161,6 +198,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.queue.ReplaceOrInsert(item) if item.ReadyAt == nil { w.metrics.add(key, item.Priority) + w.maybeAddToFairQueueLocked(item) } w.addedCounter++ continue @@ -175,11 +213,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) } item.Priority = o.Priority + + w.maybeAddToFairQueueLocked(item) } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { if readyAt == nil && !w.becameReady.Has(key) { w.metrics.add(key, item.Priority) + w.maybeAddToFairQueueLocked(item) } item.ReadyAt = readyAt } @@ -218,12 +259,8 @@ func (w *priorityqueue[T]) spin() { w.lock.Lock() defer w.lock.Unlock() - w.lockedLock.Lock() - defer w.lockedLock.Unlock() - - // manipulating the tree from within Ascend might lead to panics, so - // track what we want to delete and do it after we are done ascending. - var toDelete []*item[T] + fairQueuePopulating: + fairQueueNeedsPopulating := w.fairQueuePriority == nil w.queue.Ascend(func(item *item[T]) bool { if item.ReadyAt != nil { if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { @@ -233,38 +270,70 @@ func (w *priorityqueue[T]) spin() { if !w.becameReady.Has(item.Key) { w.metrics.add(item.Key, item.Priority) w.becameReady.Insert(item.Key) + if !fairQueueNeedsPopulating { + w.maybeAddToFairQueueLocked(item) + } } } - if w.waiters.Load() == 0 { - // Have to keep iterating here to ensure we update metrics - // for further items that became ready and set nextReady. - return true - } - // Item is locked, we can not hand it out if w.locked.Has(item.Key) { return true } + if w.fairQueuePriority == nil { + w.fairQueuePriority = ptr.To(item.Priority) + } + + if fairQueueNeedsPopulating { + w.maybeAddToFairQueueLocked(item) + } + + return true + }) + + if w.fairQueue.isEmpty() { + return + } + + for w.waiters.Load() > 0 { + item, hasItem := w.fairQueue.dequeue() + if !hasItem { + w.fairQueuePriority = nil + if w.waiters.Load() > 0 { + // There could be ready items that have a lower priority + goto fairQueuePopulating + } + break + } + w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) w.waiters.Add(-1) delete(w.items, item.Key) - toDelete = append(toDelete, item) w.becameReady.Delete(item.Key) w.get <- *item - - return true - }) - - for _, item := range toDelete { w.queue.Delete(item) + + if w.fairQueue.isEmpty() { + w.fairQueuePriority = nil + } } }() } } +func (w *priorityqueue[T]) maybeAddToFairQueueLocked(item *item[T]) { + if w.fairQueuePriority == nil || + *w.fairQueuePriority != item.Priority || + (item.ReadyAt != nil && item.ReadyAt.Sub(w.now()) > 0) || + w.locked.Has(item.Key) { + return + } + + w.fairQueue.add(item, w.extractFairnessDimensions(item.Key)...) +} + func (w *priorityqueue[T]) Add(item T) { w.AddWithOpts(AddOpts{}, item) } @@ -310,10 +379,22 @@ func (w *priorityqueue[T]) ShuttingDown() bool { } func (w *priorityqueue[T]) Done(item T) { - w.lockedLock.Lock() - defer w.lockedLock.Unlock() + w.lock.Lock() + defer w.lock.Unlock() w.locked.Delete(item) w.metrics.done(item) + + // Update the fairqueue if the item is waiting + if w.fairQueuePriority != nil && w.items[item] != nil && (w.items[item].ReadyAt == nil || w.items[item].ReadyAt.Sub(w.now()) <= 0) { + if *w.fairQueuePriority == w.items[item].Priority { + // We can just insert as the fairQueue sorts by addedCounter + w.fairQueue.add(w.items[item], w.extractFairnessDimensions(item)...) + } else if *w.fairQueuePriority < w.items[item].Priority { + w.fairQueue.drain() + w.fairQueuePriority = ptr.To(w.items[item].Priority) + w.fairQueue.add(w.items[item], w.extractFairnessDimensions(item)...) + } + } w.notifyItemOrWaiterAdded() } @@ -415,3 +496,10 @@ type bTree[T any] interface { Delete(item T) (T, bool) Ascend(iterator btree.ItemIteratorG[T]) } + +type fairQueueInterface[T comparable] interface { + add(i *item[T], dimensions ...string) + dequeue() (*item[T], bool) + drain() + isEmpty() bool +} diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 7e5d3ba3ed..449125ea7a 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -3,6 +3,7 @@ package priorityqueue import ( "fmt" "math/rand/v2" + "strings" "sync" "testing" "time" @@ -85,6 +86,31 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.adds["test"]).To(Equal(4)) }) + It("returns items fairly when configured to", func() { + q, metrics := newQueue(func(o *Opts[string]) { + o.FairnessDimensionsExtractor = func(s string) []string { + return []string{strings.Split(s, "-")[0]} + } + }) + defer q.ShutDown() + + q.AddWithOpts(AddOpts{}, "foo-1") + q.AddWithOpts(AddOpts{}, "foo-2") + q.AddWithOpts(AddOpts{}, "bar-1") + + item, _, _ := q.GetWithPriority() + Expect(item).To(Equal("bar-1")) + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("foo-1")) + item, _, _ = q.GetWithPriority() + Expect(item).To(Equal("foo-2")) + + cwq := q.(*priorityqueue[string]) + cwq.lock.Lock() + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) + Expect(metrics.adds["test"]).To(Equal(3)) + }) + It("de-duplicates items", func() { q, metrics := newQueue() defer q.ShutDown() @@ -95,7 +121,7 @@ var _ = Describe("Controllerworkqueue", func() { Consistently(q.Len).Should(Equal(1)) cwq := q.(*priorityqueue[string]) - cwq.lockedLock.Lock() + cwq.lock.Lock() Expect(cwq.locked.Len()).To(Equal(0)) Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) @@ -196,6 +222,71 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.retries["test"]).To(Equal(1)) }) + It("returns items from multiple priorities to waiting waiters once they are ready", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + Expect(d).To(Equal(time.Second)) + return tick + } + + retrievedFoo := make(chan struct{}) + retrievedBar := make(chan struct{}) + + go func() { + defer GinkgoRecover() + item, _, _ := q.GetWithPriority() + switch item { + case "foo": + close(retrievedFoo) + case "bar": + close(retrievedBar) + default: + panic(fmt.Sprintf("unexpected item %s", item)) + } + }() + go func() { + defer GinkgoRecover() + item, _, _ := q.GetWithPriority() + switch item { + case "foo": + close(retrievedFoo) + case "bar": + close(retrievedBar) + default: + panic(fmt.Sprintf("unexpected item %s", item)) + } + }() + + q.AddWithOpts(AddOpts{After: time.Second, Priority: 1}, "foo") + q.AddWithOpts(AddOpts{After: time.Second, Priority: 2}, "bar") + + Consistently(retrievedFoo).ShouldNot(BeClosed()) + Consistently(retrievedBar).ShouldNot(BeClosed()) + + nowLock.Lock() + now = now.Add(time.Second) + nowLock.Unlock() + tick <- now + Eventually(retrievedBar).Should(BeClosed()) + Eventually(retrievedFoo).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(map[int]int{1: 0, 2: 0})) + Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.retries["test"]).To(Equal(2)) + }) + It("returns an item to a waiter as soon as it has one", func() { q, metrics := newQueue() defer q.ShutDown() @@ -691,14 +782,18 @@ func TestFuzzPriorityQueue(t *testing.T) { wg.Wait() } -func newQueue() (PriorityQueue[string], *fakeMetricsProvider) { +func newQueue(opts ...Opt[string]) (PriorityQueue[string], *fakeMetricsProvider) { metrics := newFakeMetricsProvider() - q := New("test", func(o *Opts[string]) { + q := New("test", append(opts, func(o *Opts[string]) { o.MetricProvider = metrics - }) + })...) q.(*priorityqueue[string]).queue = &btreeInteractionValidator{ bTree: q.(*priorityqueue[string]).queue, } + q.(*priorityqueue[string]).fairQueue = &fairQueueInteractionValidator[string]{ + fairQueue: q.(*priorityqueue[string]).fairQueue, + pq: q.(*priorityqueue[string]), + } // validate that tick always gets a positive value as it will just return // nil otherwise, which results in blocking forever. @@ -720,16 +815,80 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s // There is no codepath that updates an item item, alreadyExist := b.bTree.ReplaceOrInsert(item) if alreadyExist { + defer GinkgoRecover() panic(fmt.Sprintf("ReplaceOrInsert: item %v already existed", item)) } return item, alreadyExist } -func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) { +func (b *btreeInteractionValidator) Delete(element *item[string]) (*item[string], bool) { // There is no codepath that deletes an item that doesn't exist - old, existed := b.bTree.Delete(item) + old, existed := b.bTree.Delete(element) if !existed { - panic(fmt.Sprintf("Delete: item %v not found", item)) + var items []item[string] + b.bTree.Ascend(func(item *item[string]) bool { + items = append(items, *item) + return true + }) + defer GinkgoRecover() + panic(fmt.Sprintf("Delete: item %v not found, items: %v", element, items)) } return old, existed } + +type fairQueueInteractionValidator[T comparable] struct { + fairQueue fairQueueInterface[T] + pq *priorityqueue[T] + allKeys sets.Set[T] +} + +func (f *fairQueueInteractionValidator[T]) add(item *item[T], dimensions ...string) { + if f.pq.fairQueuePriority == nil { + defer GinkgoRecover() + panic(fmt.Sprintf("add: fairQueuePriority is nil, item: %v", item)) + } + + if item.Priority != *f.pq.fairQueuePriority { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item priority %d does not match fair queue priority %d", item.Priority, *f.pq.fairQueuePriority)) + } + + if f.pq.locked.Has(item.Key) { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item %v is locked but got added to fair queue", item.Key)) + } + + if item.ReadyAt != nil && item.ReadyAt.After(f.pq.now()) { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item %v has ReadyAt %v in the future", item.Key, item.ReadyAt)) + } + + if f.allKeys == nil { + f.allKeys = sets.New[T]() + } + if f.allKeys.Has(item.Key) { + defer GinkgoRecover() + panic(fmt.Sprintf("add: item with key %v already exists in fair queue", item.Key)) + } + f.allKeys.Insert(item.Key) + + f.fairQueue.add(item, dimensions...) +} + +func (f *fairQueueInteractionValidator[T]) dequeue() (*item[T], bool) { + item, exists := f.fairQueue.dequeue() + if exists { + f.allKeys.Delete(item.Key) + } + + return item, exists +} + +func (f *fairQueueInteractionValidator[T]) drain() { + f.allKeys = nil + f.fairQueue.drain() +} + +func (f *fairQueueInteractionValidator[T]) isEmpty() bool { + return f.fairQueue.isEmpty() +}