Skip to content

Commit 54f66ac

Browse files
committed
Base migration to the new events API
Signed-off-by: Borja Clemente <[email protected]>
1 parent e08f24a commit 54f66ac

File tree

13 files changed

+64
-123
lines changed

13 files changed

+64
-123
lines changed

designs/move-cluster-specific-code-out-of-manager.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ type Cluster interface {
6161
// GetCache returns a cache.Cache
6262
GetCache() cache.Cache
6363

64-
// GetEventRecorderFor returns a new EventRecorder for the provided name
65-
GetEventRecorderFor(name string) record.EventRecorder
64+
//GetEventRecorder returns a new EventRecorder for the provided name
65+
GetEventRecorder(name string) record.EventRecorder
6666

6767
// GetRESTMapper returns a RESTMapper
6868
GetRESTMapper() meta.RESTMapper

pkg/cluster/cluster.go

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"k8s.io/apimachinery/pkg/runtime"
2727
"k8s.io/client-go/kubernetes/scheme"
2828
"k8s.io/client-go/rest"
29-
"k8s.io/client-go/tools/record"
29+
"k8s.io/client-go/tools/events"
3030

3131
"sigs.k8s.io/controller-runtime/pkg/cache"
3232
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -58,8 +58,8 @@ type Cluster interface {
5858
// GetFieldIndexer returns a client.FieldIndexer configured with the client
5959
GetFieldIndexer() client.FieldIndexer
6060

61-
// GetEventRecorderFor returns a new EventRecorder for the provided name
62-
GetEventRecorderFor(name string) record.EventRecorder
61+
// GetEventRecorder returns a new EventRecorder for the provided name
62+
GetEventRecorder(name string) events.EventRecorder
6363

6464
// GetRESTMapper returns a RESTMapper
6565
GetRESTMapper() meta.RESTMapper
@@ -126,16 +126,10 @@ type Options struct {
126126
//
127127
// Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers
128128
// is shorter than the lifetime of your process.
129-
EventBroadcaster record.EventBroadcaster
130-
131-
// makeBroadcaster allows deferring the creation of the broadcaster to
132-
// avoid leaking goroutines if we never call Start on this manager. It also
133-
// returns whether or not this is a "owned" broadcaster, and as such should be
134-
// stopped with the manager.
135-
makeBroadcaster intrec.EventBroadcasterProducer
129+
EventBroadcaster events.EventBroadcaster
136130

137131
// Dependency injection for testing
138-
newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
132+
newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*intrec.Provider, error)
139133
}
140134

141135
// Option can be used to manipulate Options.
@@ -228,7 +222,8 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
228222
// Create the recorder provider to inject event recorders for the components.
229223
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
230224
// to the particular controller that it's being injected into, rather than a generic one like is here.
231-
recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
225+
// Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil).
226+
recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil)
232227
if err != nil {
233228
return nil, err
234229
}
@@ -281,19 +276,6 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
281276
options.newRecorderProvider = intrec.NewProvider
282277
}
283278

284-
// This is duplicated with pkg/manager, we need it here to provide
285-
// the user with an EventBroadcaster and there for the Leader election
286-
if options.EventBroadcaster == nil {
287-
// defer initialization to avoid leaking by default
288-
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
289-
return record.NewBroadcaster(), true
290-
}
291-
} else {
292-
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
293-
return options.EventBroadcaster, false
294-
}
295-
}
296-
297279
if options.Logger.GetSink() == nil {
298280
options.Logger = logf.RuntimeLog.WithName("cluster")
299281
}

pkg/cluster/cluster_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() {
4040
c, err := New(nil)
4141
Expect(c).To(BeNil())
4242
Expect(err.Error()).To(ContainSubstring("must specify Config"))
43-
4443
})
4544

4645
It("should return an error if it can't create a RestMapper", func() {
@@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() {
5049
})
5150
Expect(c).To(BeNil())
5251
Expect(err).To(Equal(expected))
53-
5452
})
5553

5654
It("should return an error it can't create a client.Client", func() {
@@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() {
9694
Expect(err).To(HaveOccurred())
9795
Expect(err.Error()).To(ContainSubstring("expected error"))
9896
})
99-
10097
})
10198

10299
Describe("Start", func() {
@@ -160,7 +157,7 @@ var _ = Describe("cluster.Cluster", func() {
160157
It("should provide a function to get the EventRecorder", func() {
161158
c, err := New(cfg)
162159
Expect(err).NotTo(HaveOccurred())
163-
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil())
160+
Expect(c.GetEventRecorder("test")).NotTo(BeNil())
164161
})
165162
It("should provide a function to get the APIReader", func() {
166163
c, err := New(cfg)

pkg/cluster/internal.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/client-go/rest"
27-
"k8s.io/client-go/tools/record"
27+
"k8s.io/client-go/tools/events"
2828

2929
"sigs.k8s.io/controller-runtime/pkg/cache"
3030
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -83,8 +83,8 @@ func (c *cluster) GetCache() cache.Cache {
8383
return c.cache
8484
}
8585

86-
func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder {
87-
return c.recorderProvider.GetEventRecorderFor(name)
86+
func (c *cluster) GetEventRecorder(name string) events.EventRecorder {
87+
return c.recorderProvider.GetEventRecorder(name)
8888
}
8989

9090
func (c *cluster) GetRESTMapper() meta.RESTMapper {

pkg/internal/recorder/recorder.go

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@ import (
2323
"sync"
2424

2525
"github.com/go-logr/logr"
26-
corev1 "k8s.io/api/core/v1"
2726
"k8s.io/apimachinery/pkg/runtime"
28-
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
27+
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
2928
"k8s.io/client-go/rest"
30-
"k8s.io/client-go/tools/record"
29+
"k8s.io/client-go/tools/events"
3130
)
3231

3332
// EventBroadcasterProducer makes an event broadcaster, returning
3433
// whether or not the broadcaster should be stopped with the Provider,
3534
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
36-
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
35+
type EventBroadcasterProducer func() (caster events.EventBroadcaster, stopWithProvider bool)
3736

3837
// Provider is a recorder.Provider that records events to the k8s API server
3938
// and to a logr Logger.
@@ -45,11 +44,11 @@ type Provider struct {
4544
scheme *runtime.Scheme
4645
// logger is the logger to use when logging diagnostic event info
4746
logger logr.Logger
48-
evtClient corev1client.EventInterface
47+
evtClient eventsv1client.EventsV1Interface
4948
makeBroadcaster EventBroadcasterProducer
5049

5150
broadcasterOnce sync.Once
52-
broadcaster record.EventBroadcaster
51+
broadcaster events.EventBroadcaster
5352
stopBroadcaster bool
5453
}
5554

@@ -89,45 +88,48 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
8988

9089
// getBroadcaster ensures that a broadcaster is started for this
9190
// provider, and returns it. It's threadsafe.
92-
func (p *Provider) getBroadcaster() record.EventBroadcaster {
91+
func (p *Provider) getBroadcaster() events.EventBroadcaster {
9392
// NB(directxman12): this can technically still leak if something calls
9493
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
9594
// create the broadcaster in start, we could race with other things that
9695
// are started at the same time & want to emit events. The alternative is
9796
// silently swallowing events and more locking, but that seems suboptimal.
9897

9998
p.broadcasterOnce.Do(func() {
100-
broadcaster, stop := p.makeBroadcaster()
101-
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
102-
broadcaster.StartEventWatcher(
103-
func(e *corev1.Event) {
99+
if p.broadcaster == nil {
100+
p.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: p.evtClient})
101+
}
102+
// TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider.
103+
p.broadcaster.StartRecordingToSink(nil)
104+
105+
// TODO(clebs): figure out if we still need this and how the change would make sense.
106+
p.broadcaster.StartEventWatcher(
107+
func(e runtime.Object) {
104108
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
105109
})
106-
p.broadcaster = broadcaster
107-
p.stopBroadcaster = stop
108110
})
109111

110112
return p.broadcaster
111113
}
112114

113115
// NewProvider create a new Provider instance.
114-
func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
116+
func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*Provider, error) {
115117
if httpClient == nil {
116118
panic("httpClient must not be nil")
117119
}
118120

119-
corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)
121+
eventsv1Client, err := eventsv1client.NewForConfigAndClient(config, httpClient)
120122
if err != nil {
121123
return nil, fmt.Errorf("failed to init client: %w", err)
122124
}
123125

124-
p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}
126+
p := &Provider{scheme: scheme, logger: logger, broadcaster: broadcaster, stopBroadcaster: stopWithProvider, evtClient: eventsv1Client}
125127
return p, nil
126128
}
127129

128-
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
130+
// GetEventRecorder returns an event recorder that broadcasts to this provider's
129131
// broadcaster. All events will be associated with a component of the given name.
130-
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
132+
func (p *Provider) GetEventRecorder(name string) events.EventRecorder {
131133
return &lazyRecorder{
132134
prov: p,
133135
name: name,
@@ -141,41 +143,23 @@ type lazyRecorder struct {
141143
name string
142144

143145
recOnce sync.Once
144-
rec record.EventRecorder
146+
rec events.EventRecorder
145147
}
146148

147149
// ensureRecording ensures that a concrete recorder is populated for this recorder.
148150
func (l *lazyRecorder) ensureRecording() {
149151
l.recOnce.Do(func() {
150152
broadcaster := l.prov.getBroadcaster()
151-
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
153+
l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name)
152154
})
153155
}
154156

155-
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
156-
l.ensureRecording()
157-
158-
l.prov.lock.RLock()
159-
if !l.prov.stopped {
160-
l.rec.Event(object, eventtype, reason, message)
161-
}
162-
l.prov.lock.RUnlock()
163-
}
164-
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
165-
l.ensureRecording()
166-
167-
l.prov.lock.RLock()
168-
if !l.prov.stopped {
169-
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
170-
}
171-
l.prov.lock.RUnlock()
172-
}
173-
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
157+
func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
174158
l.ensureRecording()
175159

176160
l.prov.lock.RLock()
177161
if !l.prov.stopped {
178-
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
162+
l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...)
179163
}
180164
l.prov.lock.RUnlock()
181165
}

pkg/internal/recorder/recorder_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var _ = Describe("recorder", func() {
4343
Expect(err).NotTo(HaveOccurred())
4444

4545
By("Creating the Controller")
46-
recorder := cm.GetEventRecorderFor("test-recorder")
46+
recorder := cm.GetEventRecorder("test-recorder")
4747
instance, err := controller.New("foo-controller", cm, controller.Options{
4848
Reconciler: reconcile.Func(
4949
func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {

pkg/internal/recorder/recorder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ var _ = Describe("recorder.Provider", func() {
4848
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster)
4949
Expect(err).NotTo(HaveOccurred())
5050

51-
recorder := provider.GetEventRecorderFor("test")
51+
recorder := provider.GetEventRecorder("test")
5252
Expect(recorder).NotTo(BeNil())
5353
})
5454
})

pkg/leaderelection/leader_election.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,10 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op
126126
options.LeaderElectionID,
127127
corev1Client,
128128
coordinationClient,
129+
// TODO(clebs): figure out how to solve this.
129130
resourcelock.ResourceLockConfig{
130131
Identity: id,
131-
EventRecorder: recorderProvider.GetEventRecorderFor(id),
132+
EventRecorder: recorderProvider.GetEventRecorder(id),
132133
},
133134
options.LeaderLabels,
134135
)

pkg/manager/internal.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import (
3232
"k8s.io/apimachinery/pkg/runtime"
3333
kerrors "k8s.io/apimachinery/pkg/util/errors"
3434
"k8s.io/client-go/rest"
35+
"k8s.io/client-go/tools/events"
3536
"k8s.io/client-go/tools/leaderelection"
3637
"k8s.io/client-go/tools/leaderelection/resourcelock"
37-
"k8s.io/client-go/tools/record"
3838

3939
"sigs.k8s.io/controller-runtime/pkg/cache"
4040
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -255,8 +255,8 @@ func (cm *controllerManager) GetCache() cache.Cache {
255255
return cm.cluster.GetCache()
256256
}
257257

258-
func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
259-
return cm.cluster.GetEventRecorderFor(name)
258+
func (cm *controllerManager) GetEventRecorder(name string) events.EventRecorder {
259+
return cm.cluster.GetEventRecorder(name)
260260
}
261261

262262
func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {

0 commit comments

Comments
 (0)