Skip to content

⚠️ Migration to the new events API #3262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)

// Cluster provides various methods to interact with a cluster.
type Cluster interface {
recorder.Provider

// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver
GetHTTPClient() *http.Client

Expand All @@ -58,9 +63,6 @@ type Cluster interface {
// GetFieldIndexer returns a client.FieldIndexer configured with the client
GetFieldIndexer() client.FieldIndexer

// GetEventRecorderFor returns a new EventRecorder for the provided name
GetEventRecorderFor(name string) record.EventRecorder

// GetRESTMapper returns a RESTMapper
GetRESTMapper() meta.RESTMapper

Expand Down Expand Up @@ -228,6 +230,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
// Create the recorder provider to inject event recorders for the components.
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
// to the particular controller that it's being injected into, rather than a generic one like is here.
// Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil).
recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
if err != nil {
return nil, err
Expand Down Expand Up @@ -281,16 +284,24 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
options.newRecorderProvider = intrec.NewProvider
}

// This is duplicated with pkg/manager, we need it here to provide
// the user with an EventBroadcaster and there for the Leader election
evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient)
if err != nil {
return options, err
}

// This is duplicated with pkg/manager, we need it here to provide
// the user with an EventBroadcaster and there for the Leader election
if options.EventBroadcaster == nil {
// defer initialization to avoid leaking by default
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
return record.NewBroadcaster(), true
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true
}
} else {
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
return options.EventBroadcaster, false
// keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one.
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() {
c, err := New(nil)
Expect(c).To(BeNil())
Expect(err.Error()).To(ContainSubstring("must specify Config"))

})

It("should return an error if it can't create a RestMapper", func() {
Expand All @@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() {
})
Expect(c).To(BeNil())
Expect(err).To(Equal(expected))

})

It("should return an error it can't create a client.Client", func() {
Expand Down Expand Up @@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("expected error"))
})

})

Describe("Start", func() {
Expand Down Expand Up @@ -160,7 +157,7 @@ var _ = Describe("cluster.Cluster", func() {
It("should provide a function to get the EventRecorder", func() {
c, err := New(cfg)
Expect(err).NotTo(HaveOccurred())
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil())
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck
})
It("should provide a function to get the APIReader", func() {
c, err := New(cfg)
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"

"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -87,6 +88,10 @@ func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder {
return c.recorderProvider.GetEventRecorderFor(name)
}

func (c *cluster) GetEventRecorder(name string) events.EventRecorder {
return c.recorderProvider.GetEventRecorder(name)
}

func (c *cluster) GetRESTMapper() meta.RESTMapper {
return c.mapper
}
Expand Down
82 changes: 65 additions & 17 deletions pkg/internal/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"k8s.io/apimachinery/pkg/runtime"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
)

// EventBroadcasterProducer makes an event broadcaster, returning
// whether or not the broadcaster should be stopped with the Provider,
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
// This producer currently produces both a
type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool)

// Provider is a recorder.Provider that records events to the k8s API server
// and to a logr Logger.
Expand All @@ -49,8 +51,10 @@ type Provider struct {
makeBroadcaster EventBroadcasterProducer

broadcasterOnce sync.Once
broadcaster record.EventBroadcaster
stopBroadcaster bool
broadcaster events.EventBroadcaster
// Deprecated: will be removed in a future release. Use the broadcaster above instead.
deprecatedBroadcaster record.EventBroadcaster
stopBroadcaster bool
}

// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
Expand All @@ -71,10 +75,11 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
// almost certainly already been started (e.g. by leader election). We
// need to invoke this to ensure that we don't inadvertently race with
// an invocation of getBroadcaster.
broadcaster := p.getBroadcaster()
deprecatedBroadcaster, broadcaster := p.getBroadcaster()
if p.stopBroadcaster {
p.lock.Lock()
broadcaster.Shutdown()
deprecatedBroadcaster.Shutdown()
p.stopped = true
p.lock.Unlock()
}
Expand All @@ -89,25 +94,29 @@ func (p *Provider) Stop(shutdownCtx context.Context) {

// getBroadcaster ensures that a broadcaster is started for this
// provider, and returns it. It's threadsafe.
func (p *Provider) getBroadcaster() record.EventBroadcaster {
func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) {
// NB(directxman12): this can technically still leak if something calls
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
// create the broadcaster in start, we could race with other things that
// are started at the same time & want to emit events. The alternative is
// silently swallowing events and more locking, but that seems suboptimal.

p.broadcasterOnce.Do(func() {
broadcaster, stop := p.makeBroadcaster()
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
broadcaster.StartEventWatcher(
p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster()

// init old broadcaster
p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
p.deprecatedBroadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
})
p.broadcaster = broadcaster
p.stopBroadcaster = stop

// init new broadcaster
// TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider.
_ = p.broadcaster.StartRecordingToSinkWithContext(context.TODO())
})

return p.broadcaster
return p.deprecatedBroadcaster, p.broadcaster
}

// NewProvider create a new Provider instance.
Expand All @@ -128,6 +137,15 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
return &deprecatedRecorder{
prov: p,
name: name,
}
}

// GetEventRecorder returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorder(name string) events.EventRecorder {
return &lazyRecorder{
prov: p,
name: name,
Expand All @@ -141,18 +159,46 @@ type lazyRecorder struct {
name string

recOnce sync.Once
rec record.EventRecorder
rec events.EventRecorder
}

// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *lazyRecorder) ensureRecording() {
l.recOnce.Do(func() {
broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
_, broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name)
})
}

func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) {
l.ensureRecording()

l.prov.lock.RLock()
if !l.prov.stopped {
l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...)
}
l.prov.lock.RUnlock()
}

// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release.
// Deprecated: will be removed in a future release.
type deprecatedRecorder struct {
prov *Provider
name string

recOnce sync.Once
rec record.EventRecorder
}

// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *deprecatedRecorder) ensureRecording() {
l.recOnce.Do(func() {
deprecatedBroadcaster, _ := l.prov.getBroadcaster()
l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
})
}

func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) {
l.ensureRecording()

l.prov.lock.RLock()
Expand All @@ -161,7 +207,8 @@ func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message s
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {

func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) {
l.ensureRecording()

l.prov.lock.RLock()
Expand All @@ -170,7 +217,8 @@ func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageF
}
l.prov.lock.RUnlock()
}
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {

func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) {
l.ensureRecording()

l.prov.lock.RLock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/recorder/recorder_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var _ = Describe("recorder", func() {
Expect(err).NotTo(HaveOccurred())

By("Creating the Controller")
recorder := cm.GetEventRecorderFor("test-recorder")
recorder := cm.GetEventRecorderFor("test-recorder") //nolint:staticcheck
instance, err := controller.New("foo-controller", cm, controller.Options{
Reconciler: reconcile.Func(
func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/internal/recorder/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/internal/recorder"
)

var _ = Describe("recorder.Provider", func() {
makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true }
evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient)
Expect(err).NotTo(HaveOccurred())

makeBroadcaster := func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true
}

Describe("NewProvider", func() {
It("should return a provider instance and a nil error.", func() {
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster)
Expand All @@ -43,7 +51,7 @@ var _ = Describe("recorder.Provider", func() {
Expect(err.Error()).To(ContainSubstring("failed to init client"))
})
})
Describe("GetEventRecorder", func() {
Describe("GetEventRecorderFor", func() {
It("should return a recorder instance.", func() {
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster)
Expect(err).NotTo(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion pkg/leaderelection/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op
coordinationClient,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorderProvider.GetEventRecorderFor(id),
EventRecorder: recorderProvider.GetEventRecorderFor(id), //nolint:staticcheck
},
options.LeaderLabels,
)
Expand Down
7 changes: 6 additions & 1 deletion pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -256,7 +257,11 @@ func (cm *controllerManager) GetCache() cache.Cache {
}

func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
return cm.cluster.GetEventRecorderFor(name)
return cm.cluster.GetEventRecorderFor(name) //nolint:staticcheck
}

func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder {
return cm.cluster.GetEventRecorder(name)
}

func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
Expand Down
Loading