Skip to content

Commit 2ebd17f

Browse files
authored
Merge pull request #3262 from clebs/new-events-api
⚠️ Migration to the new events API
2 parents 3941708 + 572fad4 commit 2ebd17f

File tree

12 files changed

+297
-76
lines changed

12 files changed

+297
-76
lines changed

pkg/cluster/cluster.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,30 @@ package cluster
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"net/http"
2324

2425
"github.com/go-logr/logr"
2526
"k8s.io/apimachinery/pkg/api/meta"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/client-go/kubernetes/scheme"
29+
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
2830
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/tools/events"
2932
"k8s.io/client-go/tools/record"
3033

3134
"sigs.k8s.io/controller-runtime/pkg/cache"
3235
"sigs.k8s.io/controller-runtime/pkg/client"
3336
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3437
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
3538
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
39+
"sigs.k8s.io/controller-runtime/pkg/recorder"
3640
)
3741

3842
// Cluster provides various methods to interact with a cluster.
3943
type Cluster interface {
44+
recorder.Provider
45+
4046
// GetHTTPClient returns an HTTP client that can be used to talk to the apiserver
4147
GetHTTPClient() *http.Client
4248

@@ -58,9 +64,6 @@ type Cluster interface {
5864
// GetFieldIndexer returns a client.FieldIndexer configured with the client
5965
GetFieldIndexer() client.FieldIndexer
6066

61-
// GetEventRecorderFor returns a new EventRecorder for the provided name
62-
GetEventRecorderFor(name string) record.EventRecorder
63-
6467
// GetRESTMapper returns a RESTMapper
6568
GetRESTMapper() meta.RESTMapper
6669

@@ -160,8 +163,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
160163
}
161164
options, err := setOptionsDefaults(options, config)
162165
if err != nil {
163-
options.Logger.Error(err, "Failed to set defaults")
164-
return nil, err
166+
return nil, fmt.Errorf("failed setting cluster default options: %w", err)
165167
}
166168

167169
// Create the mapper provider
@@ -281,16 +283,24 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
281283
options.newRecorderProvider = intrec.NewProvider
282284
}
283285

286+
// This is duplicated with pkg/manager, we need it here to provide
287+
// the user with an EventBroadcaster and there for the Leader election
288+
evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient)
289+
if err != nil {
290+
return options, err
291+
}
292+
284293
// This is duplicated with pkg/manager, we need it here to provide
285294
// the user with an EventBroadcaster and there for the Leader election
286295
if options.EventBroadcaster == nil {
287296
// defer initialization to avoid leaking by default
288-
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
289-
return record.NewBroadcaster(), true
297+
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
298+
return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true
290299
}
291300
} else {
292-
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
293-
return options.EventBroadcaster, false
301+
// keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one.
302+
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
303+
return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false
294304
}
295305
}
296306

pkg/cluster/cluster_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ var _ = Describe("cluster.Cluster", func() {
4040
c, err := New(nil)
4141
Expect(c).To(BeNil())
4242
Expect(err.Error()).To(ContainSubstring("must specify Config"))
43-
4443
})
4544

4645
It("should return an error if it can't create a RestMapper", func() {
@@ -50,7 +49,6 @@ var _ = Describe("cluster.Cluster", func() {
5049
})
5150
Expect(c).To(BeNil())
5251
Expect(err).To(Equal(expected))
53-
5452
})
5553

5654
It("should return an error it can't create a client.Client", func() {
@@ -96,7 +94,6 @@ var _ = Describe("cluster.Cluster", func() {
9694
Expect(err).To(HaveOccurred())
9795
Expect(err.Error()).To(ContainSubstring("expected error"))
9896
})
99-
10097
})
10198

10299
Describe("Start", func() {
@@ -160,7 +157,13 @@ var _ = Describe("cluster.Cluster", func() {
160157
It("should provide a function to get the EventRecorder", func() {
161158
c, err := New(cfg)
162159
Expect(err).NotTo(HaveOccurred())
163-
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil())
160+
Expect(c.GetEventRecorder("test")).NotTo(BeNil())
161+
})
162+
163+
It("should provide a function to get the deprecated EventRecorder", func() {
164+
c, err := New(cfg)
165+
Expect(err).NotTo(HaveOccurred())
166+
Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) //nolint:staticcheck
164167
})
165168
It("should provide a function to get the APIReader", func() {
166169
c, err := New(cfg)

pkg/cluster/internal.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/client-go/rest"
27+
"k8s.io/client-go/tools/events"
2728
"k8s.io/client-go/tools/record"
2829

2930
"sigs.k8s.io/controller-runtime/pkg/cache"
@@ -87,6 +88,10 @@ func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder {
8788
return c.recorderProvider.GetEventRecorderFor(name)
8889
}
8990

91+
func (c *cluster) GetEventRecorder(name string) events.EventRecorder {
92+
return c.recorderProvider.GetEventRecorder(name)
93+
}
94+
9095
func (c *cluster) GetRESTMapper() meta.RESTMapper {
9196
return c.mapper
9297
}

pkg/internal/recorder/recorder.go

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,19 @@ import (
2424

2525
"github.com/go-logr/logr"
2626
corev1 "k8s.io/api/core/v1"
27+
eventsv1 "k8s.io/api/events/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
2829
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
2930
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/tools/events"
3032
"k8s.io/client-go/tools/record"
3133
)
3234

3335
// EventBroadcasterProducer makes an event broadcaster, returning
3436
// whether or not the broadcaster should be stopped with the Provider,
3537
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
36-
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
38+
// This producer currently produces both an old API and a new API broadcaster.
39+
type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool)
3740

3841
// Provider is a recorder.Provider that records events to the k8s API server
3942
// and to a logr Logger.
@@ -48,9 +51,13 @@ type Provider struct {
4851
evtClient corev1client.EventInterface
4952
makeBroadcaster EventBroadcasterProducer
5053

51-
broadcasterOnce sync.Once
52-
broadcaster record.EventBroadcaster
53-
stopBroadcaster bool
54+
broadcasterOnce sync.Once
55+
broadcaster events.EventBroadcaster
56+
cancelSinkRecordingFunc context.CancelFunc
57+
stopWatcherFunc func()
58+
// Deprecated: will be removed in a future release. Use the broadcaster above instead.
59+
deprecatedBroadcaster record.EventBroadcaster
60+
stopBroadcaster bool
5461
}
5562

5663
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
@@ -71,10 +78,13 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
7178
// almost certainly already been started (e.g. by leader election). We
7279
// need to invoke this to ensure that we don't inadvertently race with
7380
// an invocation of getBroadcaster.
74-
broadcaster := p.getBroadcaster()
81+
deprecatedBroadcaster, broadcaster := p.getBroadcaster()
7582
if p.stopBroadcaster {
7683
p.lock.Lock()
7784
broadcaster.Shutdown()
85+
p.cancelSinkRecordingFunc()
86+
p.stopWatcherFunc()
87+
deprecatedBroadcaster.Shutdown()
7888
p.stopped = true
7989
p.lock.Unlock()
8090
}
@@ -89,25 +99,45 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
8999

90100
// getBroadcaster ensures that a broadcaster is started for this
91101
// provider, and returns it. It's threadsafe.
92-
func (p *Provider) getBroadcaster() record.EventBroadcaster {
102+
func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) {
93103
// NB(directxman12): this can technically still leak if something calls
94104
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
95105
// create the broadcaster in start, we could race with other things that
96106
// are started at the same time & want to emit events. The alternative is
97107
// silently swallowing events and more locking, but that seems suboptimal.
98108

99109
p.broadcasterOnce.Do(func() {
100-
broadcaster, stop := p.makeBroadcaster()
101-
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
102-
broadcaster.StartEventWatcher(
110+
p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster()
111+
112+
// init deprecated broadcaster
113+
p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
114+
p.deprecatedBroadcaster.StartEventWatcher(
103115
func(e *corev1.Event) {
104116
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
105117
})
106-
p.broadcaster = broadcaster
107-
p.stopBroadcaster = stop
118+
119+
// init new broadcaster
120+
ctx, cancel := context.WithCancel(context.Background())
121+
p.cancelSinkRecordingFunc = cancel
122+
if err := p.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
123+
p.logger.Error(err, "error starting recording for broadcaster")
124+
return
125+
}
126+
127+
stopWatcher, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) {
128+
e, isEvt := event.(*eventsv1.Event)
129+
if isEvt {
130+
p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason)
131+
}
132+
})
133+
if err != nil {
134+
p.logger.Error(err, "error starting event watcher for broadcaster")
135+
}
136+
137+
p.stopWatcherFunc = stopWatcher
108138
})
109139

110-
return p.broadcaster
140+
return p.deprecatedBroadcaster, p.broadcaster
111141
}
112142

113143
// NewProvider create a new Provider instance.
@@ -128,6 +158,15 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S
128158
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
129159
// broadcaster. All events will be associated with a component of the given name.
130160
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
161+
return &deprecatedRecorder{
162+
prov: p,
163+
name: name,
164+
}
165+
}
166+
167+
// GetEventRecorder returns an event recorder that broadcasts to this provider's
168+
// broadcaster. All events will be associated with a component of the given name.
169+
func (p *Provider) GetEventRecorder(name string) events.EventRecorder {
131170
return &lazyRecorder{
132171
prov: p,
133172
name: name,
@@ -141,18 +180,47 @@ type lazyRecorder struct {
141180
name string
142181

143182
recOnce sync.Once
144-
rec record.EventRecorder
183+
rec events.EventRecorder
145184
}
146185

147186
// ensureRecording ensures that a concrete recorder is populated for this recorder.
148187
func (l *lazyRecorder) ensureRecording() {
149188
l.recOnce.Do(func() {
150-
broadcaster := l.prov.getBroadcaster()
151-
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
189+
_, broadcaster := l.prov.getBroadcaster()
190+
l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name)
152191
})
153192
}
154193

155-
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
194+
func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) {
195+
l.ensureRecording()
196+
197+
l.prov.lock.RLock()
198+
if !l.prov.stopped {
199+
l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...)
200+
}
201+
l.prov.lock.RUnlock()
202+
}
203+
204+
// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release.
205+
//
206+
// Deprecated: will be removed in a future release.
207+
type deprecatedRecorder struct {
208+
prov *Provider
209+
name string
210+
211+
recOnce sync.Once
212+
rec record.EventRecorder
213+
}
214+
215+
// ensureRecording ensures that a concrete recorder is populated for this recorder.
216+
func (l *deprecatedRecorder) ensureRecording() {
217+
l.recOnce.Do(func() {
218+
deprecatedBroadcaster, _ := l.prov.getBroadcaster()
219+
l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
220+
})
221+
}
222+
223+
func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) {
156224
l.ensureRecording()
157225

158226
l.prov.lock.RLock()
@@ -161,7 +229,8 @@ func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message s
161229
}
162230
l.prov.lock.RUnlock()
163231
}
164-
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
232+
233+
func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) {
165234
l.ensureRecording()
166235

167236
l.prov.lock.RLock()
@@ -170,7 +239,8 @@ func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageF
170239
}
171240
l.prov.lock.RUnlock()
172241
}
173-
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
242+
243+
func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) {
174244
l.ensureRecording()
175245

176246
l.prov.lock.RLock()

0 commit comments

Comments
 (0)