Skip to content

Commit 5f9e8bd

Browse files
committed
✨ Priorityqueue: Optionally return items fairly
This change makes it possible for the priorityqueue to return items fairly across one or more fairness dimensions. These fairness dimensions are extracted from items using a configurable extractor func. Fairness is only ensured within a given priority, as the purpose of priorities is to return higher-priority items before lower-priority items. An example use-case is to use the namespace as fairness dimension, which then ensures that one very busy namespace can not starve off other namespaces. An example use-case for more than one dimension are multi-cluster controllers, where fairness should first be achieved across clusters and then across namespaces within each cluster. By default, no extractor func is configured so this doesn't change any behavior. We can revisit this in the future. Naively, the whole problem looks like it could be solved by sorting the items in the queue appropriately, but that doesn't really work because: * Most (all?) sorting algorithm assume that the order of two elements can be determined just by looking at those two elements, but for fairness we have to consider the items in front of the current elements as well as the items that we already handed out * We can not hand out locked items and have to ignore them for the purpose of fairness. This in turn means that whenever an item gets unlocked, we'd have to redo our sorting This change solves the problem by introducing a `fairQueue` structure which holds all items that are ready to be handed out and in the highest priority bracket that has ready items. The priorityqueue then hands out items from its internal `fairQueue` and manages its content. The `fairQueue` works by using a slice of all the fairness dinensions it has ever observed and a map of fairness dimension to item slice. Whenever it hands out an item, it iterates over this slice and checks the corresponding map for items until it found an item to hand out.
1 parent e08f24a commit 5f9e8bd

File tree

4 files changed

+470
-39
lines changed

4 files changed

+470
-39
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package priorityqueue
2+
3+
import (
4+
"slices"
5+
6+
"k8s.io/utils/ptr"
7+
)
8+
9+
type fairq[T comparable] struct {
10+
children map[string]*fairq[T]
11+
items []*item[T]
12+
// order is the order in which to dequeue. Nil is used
13+
// as a magic value to indicate we should dequeue from
14+
// items instead of a child next.
15+
order []*string
16+
}
17+
18+
func (q *fairq[T]) add(i *item[T], dimensions ...string) {
19+
if len(dimensions) == 0 {
20+
if len(q.order) == len(q.children) {
21+
q.order = append([]*string{nil}, q.order...)
22+
}
23+
q.items = append(q.items, i)
24+
25+
// Sort here so items that get unlocked can be added, rather
26+
// than having to completely drain and rebuild the fairQueue.
27+
slices.SortFunc(q.items, func(a, b *item[T]) int {
28+
if less(a, b) {
29+
return -1
30+
}
31+
return 1
32+
})
33+
34+
return
35+
}
36+
37+
dimension := dimensions[0]
38+
dimensions = dimensions[1:]
39+
40+
if q.children == nil {
41+
q.children = make(map[string]*fairq[T], 1)
42+
}
43+
44+
_, exists := q.children[dimension]
45+
if !exists {
46+
q.children[dimension] = &fairq[T]{}
47+
q.order = append([]*string{ptr.To(dimension)}, q.order...)
48+
}
49+
50+
q.children[dimension].add(i, dimensions...)
51+
}
52+
53+
func (q *fairq[T]) dequeue() (*item[T], bool) {
54+
var item *item[T]
55+
var hasItem bool
56+
57+
for idx, dimension := range q.order {
58+
if dimension != nil { // child element
59+
item, hasItem = q.children[*dimension].dequeue()
60+
if !hasItem {
61+
continue
62+
}
63+
} else if len(q.items) > 0 { // leaf element
64+
item = q.items[0]
65+
q.items = q.items[1:]
66+
} else {
67+
continue
68+
}
69+
70+
q.order = append(q.order[:idx], q.order[idx+1:]...)
71+
q.order = append(q.order, dimension)
72+
73+
return item, true
74+
}
75+
76+
return item, false
77+
}
78+
79+
func (q *fairq[T]) drain() {
80+
for _, child := range q.children {
81+
child.drain()
82+
}
83+
q.items = nil
84+
}
85+
86+
func (q *fairq[T]) isEmpty() bool {
87+
if len(q.items) > 0 {
88+
return false
89+
}
90+
for _, child := range q.children {
91+
if !child.isEmpty() {
92+
return false
93+
}
94+
}
95+
return true
96+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package priorityqueue
2+
3+
import (
4+
"strings"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
var _ = Describe("FairQueue", func() {
11+
DescribeTable("Ordering tests",
12+
Entry("simple",
13+
[]string{"foo", "foo", "bar"},
14+
[]string{"bar", "foo", "foo"},
15+
),
16+
Entry("balanced",
17+
[]string{"foo", "foo", "bar", "bar"},
18+
[]string{"bar", "foo", "bar", "foo"},
19+
),
20+
Entry("mixed-dimensional, multi-dimensional comes first",
21+
[]string{"foo-foo", "foo-foo", "foo"},
22+
[]string{"foo", "foo-foo", "foo-foo"},
23+
),
24+
Entry("mixed-dimensional, single dimension comes first",
25+
[]string{"foo", "foo-foo", "foo-foo"},
26+
[]string{"foo-foo", "foo", "foo-foo"},
27+
),
28+
Entry("complex mixed-dimensional",
29+
[]string{"bar", "foo", "foo-foo", "foo-foo", "foo-bar"},
30+
// First the foo with the latest-added second dimension, then the bar,
31+
// then the remaining foos based on the second dimension.
32+
[]string{"foo-bar", "bar", "foo-foo", "foo", "foo-foo"},
33+
),
34+
35+
func(adds []string, expected []string) {
36+
q := &fairq[string]{}
37+
for idx, add := range adds {
38+
q.add(&item[string]{Key: add, AddedCounter: uint64(idx)}, strings.Split(add, "-")...)
39+
}
40+
41+
actual := make([]string, 0, len(expected))
42+
for range len(expected) {
43+
item, didGetItem := q.dequeue()
44+
Expect(didGetItem).To(BeTrue())
45+
actual = append(actual, item.Key)
46+
}
47+
48+
_, didGetItem := q.dequeue()
49+
Expect(didGetItem).To(BeFalse())
50+
Expect(actual).To(Equal(expected))
51+
},
52+
)
53+
54+
It("retains fairness across queue operations", func() {
55+
q := &fairq[string]{}
56+
q.add(&item[string]{Key: "foo"}, "foo")
57+
_, _ = q.dequeue()
58+
q.add(&item[string]{Key: "bar", AddedCounter: 1}, "bar")
59+
q.add(&item[string]{Key: "foo", AddedCounter: 2}, "foo")
60+
61+
item, _ := q.dequeue()
62+
Expect(item.Key).To(Equal("bar"))
63+
Expect(q.isEmpty()).To(BeFalse())
64+
})
65+
66+
It("sorts by added counter", func() {
67+
q := &fairq[string]{}
68+
q.add(&item[string]{Key: "foo", AddedCounter: 2})
69+
q.add(&item[string]{Key: "bar", AddedCounter: 1})
70+
71+
item, _ := q.dequeue()
72+
Expect(item.Key).To(Equal("bar"))
73+
Expect(q.isEmpty()).To(BeFalse())
74+
})
75+
76+
It("drains all items", func() {
77+
q := &fairq[string]{}
78+
q.add(&item[string]{Key: "foo"}, "foo")
79+
q.add(&item[string]{Key: "bar"}, "bar")
80+
q.add(&item[string]{Key: "baz"}, "baz")
81+
82+
q.drain()
83+
_, gotItem := q.dequeue()
84+
Expect(gotItem).To(BeFalse())
85+
Expect(q.isEmpty()).To(BeTrue())
86+
})
87+
})

0 commit comments

Comments
 (0)