Skip to content

Commit 6e35b34

Browse files
authored
✨ Implement warm replica support for controllers (#3192)
* [Warm Replicas] Implement warm replica support for controllers. * Remove irrelevant runnable_group.go code. * Rename ShouldWarmup. * fmt * Change to atomic.Bool to avoid race in test. * Address comments. * Add ready check to block controller startup until warmup is complete. * Keep test helper structs private. * Address comments. * Fix lint. * Address naming + comments from sbueringer. * Refactor tests to use HaveValue. * Document + add UT for WaitForWarmupComplete behavior on ctx cancellation. * Add unit test that exercises controller warmup integration with manager. * Add UT that verifies WaitForWarmupComplete blocking / non-blocking behavior. * Verify r.Others.startQueue in runnables test cases. * Fix UT to verify runnable ordering. * Fix UT for WaitForWarmupComplete blocking. * Document !NeedLeaderElection+NeedWarmup behavior * Fix test race. * Cleanup test wrapper runnables. * Make didStartEventSources run once with sync.Once + UT. * Rewrite Warmup to avoid polling. * Rename NeedWarmup to EnableWarmup. * Clarify comment on Warmup. * Move reset watches critical section inside of startEventSources. * Add test to assert startEventSources blocking behavior. * Make Start threadsafe with Warmup + UT. * Change warmup to use buffered error channel and add New method. * Fail in warmup directly and rely on sync.Once for warmup thread-safety without WaitForWarmupComplete. * Sync controller EnableWarmup comments. * Rename to startEventSourcesLocked and lock with c.mu * Address edge case for watch added after warmup completes. * Fix test description and set leaderelection==true * Fix lint. * Change shutdown order to shutdown warmup runnables in parallel with other runnables. * Fix test races by ensuring goroutines do not outlive their It blocks. * Block on source start on context cancel. * Guard access to c.Queue explicitly. * Initialize queue in warmup with test. * Fix watch comment. * Add warmup to manager and controller integration tests. * fmt + lint. * Add tests for Warmup for parity with Start. * golangci-lint * Increase test coverage for start with warmup race. * Remove synchronization code around c.Queue concurrent access. * Assert startedEventSourcesAndQueue is set in tests. * fmt * Clarify namespace predicate vs. cleanup in controller integration test. * Add option for fake leader elector package and assert that warmup runnables are started before leader election. * fmt + rename * Address PR comments.
1 parent 948d554 commit 6e35b34

File tree

13 files changed

+1348
-128
lines changed

13 files changed

+1348
-128
lines changed

pkg/config/controller.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ type Controller struct {
6060
// Defaults to true, which means the controller will use leader election.
6161
NeedLeaderElection *bool
6262

63+
// EnableWarmup specifies whether the controller should start its sources when the manager is not
64+
// the leader. This is useful for cases where sources take a long time to start, as it allows
65+
// for the controller to warm up its caches even before it is elected as the leader. This
66+
// improves leadership failover time, as the caches will be prepopulated before the controller
67+
// transitions to be leader.
68+
//
69+
// Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its
70+
// sources without waiting to become leader.
71+
// Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without
72+
// leader election do not wait on leader election to start their sources.
73+
// Defaults to false.
74+
EnableWarmup *bool
75+
6376
// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
6477
// priority queue.
6578
//

pkg/controller/controller.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,19 @@ type TypedOptions[request comparable] struct {
9393
//
9494
// Note: This flag is disabled by default until a future version. It's currently in beta.
9595
UsePriorityQueue *bool
96+
97+
// EnableWarmup specifies whether the controller should start its sources when the manager is not
98+
// the leader. This is useful for cases where sources take a long time to start, as it allows
99+
// for the controller to warm up its caches even before it is elected as the leader. This
100+
// improves leadership failover time, as the caches will be prepopulated before the controller
101+
// transitions to be leader.
102+
//
103+
// Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its
104+
// sources without waiting to become leader.
105+
// Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without
106+
// leader election do not wait on leader election to start their sources.
107+
// Defaults to false.
108+
EnableWarmup *bool
96109
}
97110

98111
// DefaultFromConfig defaults the config from a config.Controller
@@ -124,6 +137,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller
124137
if options.NeedLeaderElection == nil {
125138
options.NeedLeaderElection = config.NeedLeaderElection
126139
}
140+
141+
if options.EnableWarmup == nil {
142+
options.EnableWarmup = config.EnableWarmup
143+
}
127144
}
128145

129146
// Controller implements an API. A Controller manages a work queue fed reconcile.Requests
@@ -243,7 +260,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
243260
}
244261

245262
// Create controller with dependencies set
246-
return &controller.Controller[request]{
263+
return controller.New[request](controller.Options[request]{
247264
Do: options.Reconciler,
248265
RateLimiter: options.RateLimiter,
249266
NewQueue: options.NewQueue,
@@ -253,7 +270,8 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
253270
LogConstructor: options.LogConstructor,
254271
RecoverPanic: options.RecoverPanic,
255272
LeaderElected: options.NeedLeaderElection,
256-
}, nil
273+
EnableWarmup: options.EnableWarmup,
274+
}), nil
257275
}
258276

259277
// ReconcileIDFromContext gets the reconcileID from the current context.

pkg/controller/controller_integration_test.go

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@ package controller_test
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"strconv"
2123

2224
appsv1 "k8s.io/api/apps/v1"
2325
corev1 "k8s.io/api/core/v1"
2426
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2527
"k8s.io/apimachinery/pkg/runtime/schema"
2628
"k8s.io/apimachinery/pkg/types"
29+
"k8s.io/utils/ptr"
2730
"sigs.k8s.io/controller-runtime/pkg/cache"
31+
"sigs.k8s.io/controller-runtime/pkg/client"
2832
"sigs.k8s.io/controller-runtime/pkg/controller"
2933
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
3034
"sigs.k8s.io/controller-runtime/pkg/handler"
35+
"sigs.k8s.io/controller-runtime/pkg/predicate"
3136
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3237
"sigs.k8s.io/controller-runtime/pkg/source"
3338

@@ -48,30 +53,48 @@ var _ = Describe("controller", func() {
4853
Describe("controller", func() {
4954
// TODO(directxman12): write a whole suite of controller-client interaction tests
5055

51-
It("should reconcile", func() {
56+
// The watches in this test are setup with a namespace predicate to avoid each table entry
57+
// from interfering with the others. We cannot add a delete call for the pods created in the
58+
// test, as it causes flakes with the api-server termination timing out.
59+
// See https://github.com/kubernetes-sigs/controller-runtime/issues/1571 for a description
60+
// of the issue, and a discussion here: https://github.com/kubernetes-sigs/controller-runtime/pull/3192#discussion_r2186967799
61+
DescribeTable("should reconcile", func(enableWarmup bool) {
5262
By("Creating the Manager")
5363
cm, err := manager.New(cfg, manager.Options{})
5464
Expect(err).NotTo(HaveOccurred())
5565

5666
By("Creating the Controller")
57-
instance, err := controller.New("foo-controller", cm, controller.Options{
58-
Reconciler: reconcile.Func(
59-
func(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
60-
reconciled <- request
61-
return reconcile.Result{}, nil
62-
}),
63-
})
67+
instance, err := controller.New(
68+
fmt.Sprintf("foo-controller-%t", enableWarmup),
69+
cm,
70+
controller.Options{
71+
Reconciler: reconcile.Func(
72+
func(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
73+
reconciled <- request
74+
return reconcile.Result{}, nil
75+
}),
76+
EnableWarmup: ptr.To(enableWarmup),
77+
},
78+
)
6479
Expect(err).NotTo(HaveOccurred())
6580

81+
testNamespace := strconv.FormatBool(enableWarmup)
82+
6683
By("Watching Resources")
6784
err = instance.Watch(
6885
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
6986
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
87+
makeNamespacePredicate[*appsv1.ReplicaSet](testNamespace),
7088
),
7189
)
7290
Expect(err).NotTo(HaveOccurred())
7391

74-
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
92+
err = instance.Watch(
93+
source.Kind(cm.GetCache(), &appsv1.Deployment{},
94+
&handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{},
95+
makeNamespacePredicate[*appsv1.Deployment](testNamespace),
96+
),
97+
)
7598
Expect(err).NotTo(HaveOccurred())
7699

77100
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
@@ -110,19 +133,25 @@ var _ = Describe("controller", func() {
110133
},
111134
}
112135
expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{
113-
Namespace: "default",
136+
Namespace: testNamespace,
114137
Name: "deployment-name",
115138
}}
116139

140+
By("Creating the test namespace")
141+
_, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
142+
ObjectMeta: metav1.ObjectMeta{Name: testNamespace},
143+
}, metav1.CreateOptions{})
144+
Expect(err).NotTo(HaveOccurred())
145+
117146
By("Invoking Reconciling for Create")
118-
deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
147+
deployment, err = clientset.AppsV1().Deployments(testNamespace).Create(ctx, deployment, metav1.CreateOptions{})
119148
Expect(err).NotTo(HaveOccurred())
120149
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
121150

122151
By("Invoking Reconciling for Update")
123152
newDeployment := deployment.DeepCopy()
124153
newDeployment.Labels = map[string]string{"foo": "bar"}
125-
_, err = clientset.AppsV1().Deployments("default").Update(ctx, newDeployment, metav1.UpdateOptions{})
154+
_, err = clientset.AppsV1().Deployments(testNamespace).Update(ctx, newDeployment, metav1.UpdateOptions{})
126155
Expect(err).NotTo(HaveOccurred())
127156
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
128157

@@ -145,24 +174,24 @@ var _ = Describe("controller", func() {
145174
Template: deployment.Spec.Template,
146175
},
147176
}
148-
replicaset, err = clientset.AppsV1().ReplicaSets("default").Create(ctx, replicaset, metav1.CreateOptions{})
177+
replicaset, err = clientset.AppsV1().ReplicaSets(testNamespace).Create(ctx, replicaset, metav1.CreateOptions{})
149178
Expect(err).NotTo(HaveOccurred())
150179
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
151180

152181
By("Invoking Reconciling for an OwnedObject when it is updated")
153182
newReplicaset := replicaset.DeepCopy()
154183
newReplicaset.Labels = map[string]string{"foo": "bar"}
155-
_, err = clientset.AppsV1().ReplicaSets("default").Update(ctx, newReplicaset, metav1.UpdateOptions{})
184+
_, err = clientset.AppsV1().ReplicaSets(testNamespace).Update(ctx, newReplicaset, metav1.UpdateOptions{})
156185
Expect(err).NotTo(HaveOccurred())
157186
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
158187

159188
By("Invoking Reconciling for an OwnedObject when it is deleted")
160-
err = clientset.AppsV1().ReplicaSets("default").Delete(ctx, replicaset.Name, metav1.DeleteOptions{})
189+
err = clientset.AppsV1().ReplicaSets(testNamespace).Delete(ctx, replicaset.Name, metav1.DeleteOptions{})
161190
Expect(err).NotTo(HaveOccurred())
162191
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
163192

164193
By("Invoking Reconciling for Delete")
165-
err = clientset.AppsV1().Deployments("default").
194+
err = clientset.AppsV1().Deployments(testNamespace).
166195
Delete(ctx, "deployment-name", metav1.DeleteOptions{})
167196
Expect(err).NotTo(HaveOccurred())
168197
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
@@ -174,7 +203,12 @@ var _ = Describe("controller", func() {
174203

175204
By("Invoking Reconciling for a pod when it is created when adding watcher dynamically")
176205
// Add new watcher dynamically
177-
err = instance.Watch(source.Kind(cm.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
206+
err = instance.Watch(
207+
source.Kind(cm.GetCache(), &corev1.Pod{},
208+
&handler.TypedEnqueueRequestForObject[*corev1.Pod]{},
209+
makeNamespacePredicate[*corev1.Pod](testNamespace),
210+
),
211+
)
178212
Expect(err).NotTo(HaveOccurred())
179213

180214
pod := &corev1.Pod{
@@ -194,16 +228,27 @@ var _ = Describe("controller", func() {
194228
},
195229
}
196230
expectedReconcileRequest = reconcile.Request{NamespacedName: types.NamespacedName{
197-
Namespace: "default",
231+
Namespace: testNamespace,
198232
Name: "pod-name",
199233
}}
200-
_, err = clientset.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{})
234+
_, err = clientset.CoreV1().Pods(testNamespace).Create(ctx, pod, metav1.CreateOptions{})
201235
Expect(err).NotTo(HaveOccurred())
202236
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
203-
})
237+
},
238+
Entry("with controller warmup enabled", true),
239+
Entry("with controller warmup not enabled", false),
240+
)
204241
})
205242
})
206243

244+
// makeNamespacePredicate returns a predicate that filters out all objects not in the passed in
245+
// namespace.
246+
func makeNamespacePredicate[object client.Object](namespace string) predicate.TypedPredicate[object] {
247+
return predicate.NewTypedPredicateFuncs[object](func(obj object) bool {
248+
return obj.GetNamespace() == namespace
249+
})
250+
}
251+
207252
func truePtr() *bool {
208253
t := true
209254
return &t

pkg/controller/controller_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,5 +474,71 @@ var _ = Describe("controller.Controller", func() {
474474
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
475475
Expect(ok).To(BeFalse())
476476
})
477+
478+
It("should set EnableWarmup correctly", func() {
479+
m, err := manager.New(cfg, manager.Options{})
480+
Expect(err).NotTo(HaveOccurred())
481+
482+
// Test with EnableWarmup set to true
483+
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{
484+
Reconciler: reconcile.Func(nil),
485+
EnableWarmup: ptr.To(true),
486+
})
487+
Expect(err).NotTo(HaveOccurred())
488+
489+
internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request])
490+
Expect(ok).To(BeTrue())
491+
Expect(internalCtrlWithWarmup.EnableWarmup).To(HaveValue(BeTrue()))
492+
493+
// Test with EnableWarmup set to false
494+
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{
495+
Reconciler: reconcile.Func(nil),
496+
EnableWarmup: ptr.To(false),
497+
})
498+
Expect(err).NotTo(HaveOccurred())
499+
500+
internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request])
501+
Expect(ok).To(BeTrue())
502+
Expect(internalCtrlWithoutWarmup.EnableWarmup).To(HaveValue(BeFalse()))
503+
504+
// Test with EnableWarmup not set (should default to nil)
505+
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{
506+
Reconciler: reconcile.Func(nil),
507+
})
508+
Expect(err).NotTo(HaveOccurred())
509+
510+
internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request])
511+
Expect(ok).To(BeTrue())
512+
Expect(internalCtrlWithDefaultWarmup.EnableWarmup).To(BeNil())
513+
})
514+
515+
It("should inherit EnableWarmup from manager config", func() {
516+
// Test with manager default setting EnableWarmup to true
517+
managerWithWarmup, err := manager.New(cfg, manager.Options{
518+
Controller: config.Controller{
519+
EnableWarmup: ptr.To(true),
520+
},
521+
})
522+
Expect(err).NotTo(HaveOccurred())
523+
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{
524+
Reconciler: reconcile.Func(nil),
525+
})
526+
Expect(err).NotTo(HaveOccurred())
527+
528+
internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request])
529+
Expect(ok).To(BeTrue())
530+
Expect(internalCtrlInheritingWarmup.EnableWarmup).To(HaveValue(BeTrue()))
531+
532+
// Test that explicit controller setting overrides manager setting
533+
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{
534+
Reconciler: reconcile.Func(nil),
535+
EnableWarmup: ptr.To(false),
536+
})
537+
Expect(err).NotTo(HaveOccurred())
538+
539+
internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request])
540+
Expect(ok).To(BeTrue())
541+
Expect(internalCtrlOverridingWarmup.EnableWarmup).To(HaveValue(BeFalse()))
542+
})
477543
})
478544
})

0 commit comments

Comments
 (0)