diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 0603f4cde5..cd50e1e0a0 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -25,7 +25,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -33,10 +35,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/recorder" ) // Cluster provides various methods to interact with a cluster. type Cluster interface { + recorder.Provider + // GetHTTPClient returns an HTTP client that can be used to talk to the apiserver GetHTTPClient() *http.Client @@ -58,9 +63,6 @@ type Cluster interface { // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer - // GetEventRecorderFor returns a new EventRecorder for the provided name - GetEventRecorderFor(name string) record.EventRecorder - // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper @@ -228,6 +230,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. + // Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil). recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err @@ -281,16 +284,24 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) { options.newRecorderProvider = intrec.NewProvider } + // This is duplicated with pkg/manager, we need it here to provide + // the user with an EventBroadcaster and there for the Leader election + evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient) + if err != nil { + return options, err + } + // This is duplicated with pkg/manager, we need it here to provide // the user with an EventBroadcaster and there for the Leader election if options.EventBroadcaster == nil { // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true } } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false + // keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one. + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false } } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index c08a742403..de756802c0 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() { c, err := New(nil) Expect(c).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) - }) It("should return an error if it can't create a RestMapper", func() { @@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() { }) Expect(c).To(BeNil()) Expect(err).To(Equal(expected)) - }) It("should return an error it can't create a client.Client", func() { @@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() { Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("expected error")) }) - }) Describe("Start", func() { @@ -160,7 +157,7 @@ var _ = Describe("cluster.Cluster", func() { It("should provide a function to get the EventRecorder", func() { c, err := New(cfg) Expect(err).NotTo(HaveOccurred()) - Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck }) It("should provide a function to get the APIReader", func() { c, err := New(cfg) diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go index 2742764231..755f83b546 100644 --- a/pkg/cluster/internal.go +++ b/pkg/cluster/internal.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -87,6 +88,10 @@ func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder { return c.recorderProvider.GetEventRecorderFor(name) } +func (c *cluster) GetEventRecorder(name string) events.EventRecorder { + return c.recorderProvider.GetEventRecorder(name) +} + func (c *cluster) GetRESTMapper() meta.RESTMapper { return c.mapper } diff --git a/pkg/internal/recorder/recorder.go b/pkg/internal/recorder/recorder.go index 21f0146ba3..7772587da6 100644 --- a/pkg/internal/recorder/recorder.go +++ b/pkg/internal/recorder/recorder.go @@ -27,13 +27,15 @@ import ( "k8s.io/apimachinery/pkg/runtime" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" ) // EventBroadcasterProducer makes an event broadcaster, returning // whether or not the broadcaster should be stopped with the Provider, // or not (e.g. if it's shared, it shouldn't be stopped with the Provider). -type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool) +// This producer currently produces both a +type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool) // Provider is a recorder.Provider that records events to the k8s API server // and to a logr Logger. @@ -49,8 +51,10 @@ type Provider struct { makeBroadcaster EventBroadcasterProducer broadcasterOnce sync.Once - broadcaster record.EventBroadcaster - stopBroadcaster bool + broadcaster events.EventBroadcaster + // Deprecated: will be removed in a future release. Use the broadcaster above instead. + deprecatedBroadcaster record.EventBroadcaster + stopBroadcaster bool } // NB(directxman12): this manually implements Stop instead of Being a runnable because we need to @@ -71,10 +75,11 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // almost certainly already been started (e.g. by leader election). We // need to invoke this to ensure that we don't inadvertently race with // an invocation of getBroadcaster. - broadcaster := p.getBroadcaster() + deprecatedBroadcaster, broadcaster := p.getBroadcaster() if p.stopBroadcaster { p.lock.Lock() broadcaster.Shutdown() + deprecatedBroadcaster.Shutdown() p.stopped = true p.lock.Unlock() } @@ -89,7 +94,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) { // getBroadcaster ensures that a broadcaster is started for this // provider, and returns it. It's threadsafe. -func (p *Provider) getBroadcaster() record.EventBroadcaster { +func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) { // NB(directxman12): this can technically still leak if something calls // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we // create the broadcaster in start, we could race with other things that @@ -97,17 +102,21 @@ func (p *Provider) getBroadcaster() record.EventBroadcaster { // silently swallowing events and more locking, but that seems suboptimal. p.broadcasterOnce.Do(func() { - broadcaster, stop := p.makeBroadcaster() - broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) - broadcaster.StartEventWatcher( + p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster() + + // init old broadcaster + p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) + p.deprecatedBroadcaster.StartEventWatcher( func(e *corev1.Event) { p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) }) - p.broadcaster = broadcaster - p.stopBroadcaster = stop + + // init new broadcaster + // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider. + _ = p.broadcaster.StartRecordingToSinkWithContext(context.TODO()) }) - return p.broadcaster + return p.deprecatedBroadcaster, p.broadcaster } // NewProvider create a new Provider instance. @@ -128,6 +137,15 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S // GetEventRecorderFor returns an event recorder that broadcasts to this provider's // broadcaster. All events will be associated with a component of the given name. func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder { + return &deprecatedRecorder{ + prov: p, + name: name, + } +} + +// GetEventRecorder returns an event recorder that broadcasts to this provider's +// broadcaster. All events will be associated with a component of the given name. +func (p *Provider) GetEventRecorder(name string) events.EventRecorder { return &lazyRecorder{ prov: p, name: name, @@ -141,18 +159,46 @@ type lazyRecorder struct { name string recOnce sync.Once - rec record.EventRecorder + rec events.EventRecorder } // ensureRecording ensures that a concrete recorder is populated for this recorder. func (l *lazyRecorder) ensureRecording() { l.recOnce.Do(func() { - broadcaster := l.prov.getBroadcaster() - l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) + _, broadcaster := l.prov.getBroadcaster() + l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name) }) } -func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) { +func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) { + l.ensureRecording() + + l.prov.lock.RLock() + if !l.prov.stopped { + l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...) + } + l.prov.lock.RUnlock() +} + +// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release. +// Deprecated: will be removed in a future release. +type deprecatedRecorder struct { + prov *Provider + name string + + recOnce sync.Once + rec record.EventRecorder +} + +// ensureRecording ensures that a concrete recorder is populated for this recorder. +func (l *deprecatedRecorder) ensureRecording() { + l.recOnce.Do(func() { + deprecatedBroadcaster, _ := l.prov.getBroadcaster() + l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) + }) +} + +func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) { l.ensureRecording() l.prov.lock.RLock() @@ -161,7 +207,8 @@ func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message s } l.prov.lock.RUnlock() } -func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + +func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { l.ensureRecording() l.prov.lock.RLock() @@ -170,7 +217,8 @@ func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageF } l.prov.lock.RUnlock() } -func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + +func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { l.ensureRecording() l.prov.lock.RLock() diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index c278fbde79..f1fd74f49c 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -43,7 +43,7 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") - recorder := cm.GetEventRecorderFor("test-recorder") + recorder := cm.GetEventRecorderFor("test-recorder") //nolint:staticcheck instance, err := controller.New("foo-controller", cm, controller.Options{ Reconciler: reconcile.Func( func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/pkg/internal/recorder/recorder_test.go b/pkg/internal/recorder/recorder_test.go index e226e165a3..cf51ee8f8d 100644 --- a/pkg/internal/recorder/recorder_test.go +++ b/pkg/internal/recorder/recorder_test.go @@ -21,12 +21,20 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/internal/recorder" ) var _ = Describe("recorder.Provider", func() { - makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true } + evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient) + Expect(err).NotTo(HaveOccurred()) + + makeBroadcaster := func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true + } + Describe("NewProvider", func() { It("should return a provider instance and a nil error.", func() { provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) @@ -43,7 +51,7 @@ var _ = Describe("recorder.Provider", func() { Expect(err.Error()).To(ContainSubstring("failed to init client")) }) }) - Describe("GetEventRecorder", func() { + Describe("GetEventRecorderFor", func() { It("should return a recorder instance.", func() { provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 6c013e7992..09534df3f0 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -128,7 +128,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op coordinationClient, resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: recorderProvider.GetEventRecorderFor(id), + EventRecorder: recorderProvider.GetEventRecorderFor(id), //nolint:staticcheck }, options.LeaderLabels, ) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a9f91cbdd5..859a75b947 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -32,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -256,7 +257,11 @@ func (cm *controllerManager) GetCache() cache.Cache { } func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { - return cm.cluster.GetEventRecorderFor(name) + return cm.cluster.GetEventRecorderFor(name) //nolint:staticcheck +} + +func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder { + return cm.cluster.GetEventRecorder(name) } func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e0e94245e7..69191ee5a9 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -29,7 +29,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" @@ -337,7 +339,11 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, errors.New("must specify Config") } // Set default values for options fields - options = setOptionsDefaults(options) + options, err := setOptionsDefaults(config, options) + if err != nil { + options.Logger.Error(err, "Failed to set defaults") + return nil, err + } cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme @@ -493,7 +499,7 @@ func defaultBaseContext() context.Context { } // setOptionsDefaults set default values for Options fields. -func setOptionsDefaults(options Options) Options { +func setOptionsDefaults(config *rest.Config, options Options) (Options, error) { // Allow newResourceLock to be mocked if options.newResourceLock == nil { options.newResourceLock = leaderelection.NewResourceLock @@ -507,14 +513,25 @@ func setOptionsDefaults(options Options) Options { // This is duplicated with pkg/cluster, we need it here // for the leader election and there to provide the user with // an EventBroadcaster + httpClient, err := rest.HTTPClientFor(config) + if err != nil { + return options, err + } + + evtCl, err := eventsv1client.NewForConfigAndClient(config, httpClient) + if err != nil { + return options, err + } + if options.EventBroadcaster == nil { // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true } } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false + // keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one. + options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) { + return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false } } @@ -571,5 +588,5 @@ func setOptionsDefaults(options Options) Options { options.WebhookServer = webhook.NewServer(webhook.Options{}) } - return options + return options, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 4363d62f59..f1b8ef4724 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -60,7 +60,6 @@ var _ = Describe("manger.Manager", func() { m, err := New(nil, Options{}) Expect(m).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) - }) It("should return an error if it can't create a RestMapper", func() { @@ -70,7 +69,6 @@ var _ = Describe("manger.Manager", func() { }) Expect(m).To(BeNil()) Expect(err).To(Equal(expected)) - }) It("should return an error it can't create a client.Client", func() { @@ -207,7 +205,6 @@ var _ = Describe("manger.Manager", func() { } // Don't leak routines <-mgrDone - }) It("should disable gracefulShutdown when stopping to lead", func(ctx SpecContext) { m, err := New(cfg, Options{ @@ -443,7 +440,6 @@ var _ = Describe("manger.Manager", func() { Expect(ok).To(BeTrue()) _, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock) Expect(isLeaseLock).To(BeTrue()) - }) It("should use the specified ResourceLock", func() { m, err := New(cfg, Options{ @@ -671,7 +667,7 @@ var _ = Describe("manger.Manager", func() { }) Describe("Start", func() { - var startSuite = func(options Options, callbacks ...func(Manager)) { + startSuite := func(options Options, callbacks ...func(Manager)) { It("should Start each Component", func(ctx SpecContext) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) @@ -1256,7 +1252,6 @@ var _ = Describe("manger.Manager", func() { <-managerStopDone Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond)) }) - } Context("with defaults", func() { @@ -1790,7 +1785,6 @@ var _ = Describe("manger.Manager", func() { err = m.Start(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("manager already started")) - }) }) @@ -1820,7 +1814,7 @@ var _ = Describe("manger.Manager", func() { ns := corev1.Namespace{} ns.Name = "default" - recorder := m.GetEventRecorderFor("rock-and-roll") + recorder := m.GetEventRecorderFor("rock-and-roll") //nolint:staticcheck Expect(m.Add(RunnableFunc(func(_ context.Context) error { recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah") return nil @@ -1941,7 +1935,7 @@ var _ = Describe("manger.Manager", func() { It("should provide a function to get the EventRecorder", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) + Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck }) It("should provide a function to get the APIReader", func() { m, err := New(cfg, Options{}) @@ -2020,8 +2014,7 @@ var _ = Describe("manger.Manager", func() { }) }) -type runnableError struct { -} +type runnableError struct{} func (runnableError) Error() string { return "not feeling like that" diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go index f093f0a726..2dee673a91 100644 --- a/pkg/recorder/recorder.go +++ b/pkg/recorder/recorder.go @@ -21,11 +21,17 @@ limitations under the License. package recorder import ( + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/record" ) // Provider knows how to generate new event recorders with given name. type Provider interface { - // NewRecorder returns an EventRecorder with given name. + // GetEventRecorder returns a EventRecorder with given name. + // + // Deprecated: this uses the old events API and will be removed in a future release. Please use GetEventRecorder instead. GetEventRecorderFor(name string) record.EventRecorder + // GetEventRecorder returns an EventRecorder for the old events API. + // The old API is not 100% supported anymore, use the new one whenever possible. + GetEventRecorder(name string) events.EventRecorder }