1
1
package main
2
2
3
3
import (
4
+ "context"
4
5
"encoding/json"
5
6
"flag"
7
+ "github.com/hashicorp/golang-lru/v2/expirable"
8
+ "github.com/prometheus/client_golang/prometheus/promauto"
9
+ "k8s.io/apimachinery/pkg/api/meta"
10
+ "k8s.io/apimachinery/pkg/runtime/schema"
11
+ "k8s.io/client-go/discovery"
12
+ "k8s.io/client-go/dynamic"
13
+ "k8s.io/client-go/rest"
14
+ "k8s.io/client-go/restmapper"
6
15
"log"
16
+ "net/http"
7
17
"os"
18
+ "time"
19
+
20
+ "github.com/prometheus/client_golang/prometheus"
21
+ "github.com/prometheus/client_golang/prometheus/promhttp"
8
22
9
23
corev1 "k8s.io/api/core/v1"
10
24
"k8s.io/apimachinery/pkg/fields"
11
25
"k8s.io/client-go/kubernetes"
12
26
"k8s.io/client-go/tools/cache"
13
27
"k8s.io/client-go/tools/clientcmd"
28
+
29
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
+ "k8s.io/client-go/discovery/cached/memory"
31
+
32
+ "github.com/hashicorp/golang-lru/v2"
14
33
)
15
34
16
35
var (
17
- ignoreNormal = flag .Bool ("ignore-normal" , false , "ignore events of type 'Normal' to reduce noise" )
18
- ignoreUpdate = flag .Bool ("ignore-update" , true , "ignore update of events" )
36
+ ignoreNormal = flag .Bool ("ignore-normal" , false , "ignore events of type 'Normal' to reduce noise" )
37
+ ignoreUpdate = flag .Bool ("ignore-update" , true , "ignore update of events" )
38
+ metricsEnabled = flag .Bool ("metrics-enabled" , false , "expose Prometheus metrics on :8080/metrics" )
39
+ )
40
+
41
+ var (
42
+ eventsTotal = promauto .NewCounterVec (
43
+ prometheus.CounterOpts {
44
+ Name : "k8s_event_logger_q_k8s_events_total" ,
45
+ Help : "Total number of Kubernetes events processed" ,
46
+ },
47
+ []string {"type" , "reason" , "object_kind" , "qovery_project_id" , "qovery_environment_id" , "qovery_service_id" },
48
+ )
49
+ eventsHit = promauto .NewCounterVec (
50
+ prometheus.CounterOpts {
51
+ Name : "k8s_event_logger_q_cache_event_total" ,
52
+ Help : "Cache hit/miss events for object label fetching" ,
53
+ },
54
+ []string {"cache_type" },
55
+ )
19
56
)
20
57
21
58
func main () {
@@ -24,6 +61,16 @@ func main() {
24
61
loggerApplication := log .New (os .Stderr , "" , log .LstdFlags )
25
62
loggerEvent := log .New (os .Stdout , "" , 0 )
26
63
64
+ if * metricsEnabled {
65
+ go func () {
66
+ http .Handle ("/metrics" , promhttp .Handler ())
67
+ loggerApplication .Printf ("Prometheus endpoint listening on :8080/metrics" )
68
+ if err := http .ListenAndServe (":8080" , nil ); err != nil {
69
+ loggerApplication .Fatalf ("metrics server: %v" , err )
70
+ }
71
+ }()
72
+ }
73
+
27
74
// Using First sample from https://pkg.go.dev/k8s.io/client-go/tools/clientcmd to automatically deal with environment variables and default file paths
28
75
29
76
loadingRules := clientcmd .NewDefaultClientConfigLoadingRules ()
@@ -39,6 +86,11 @@ func main() {
39
86
loggerApplication .Panicln (err .Error ())
40
87
}
41
88
89
+ fetcher , err := NewObjectLabelFetcher (config , 10_000 , 30 * time .Minute )
90
+ if err != nil {
91
+ loggerApplication .Fatalf ("failed to build label fetcher: %v" , err )
92
+ }
93
+
42
94
// Note that this *should* automatically sanitize sensitive fields
43
95
loggerApplication .Println ("Using configuration:" , config .String ())
44
96
@@ -59,16 +111,20 @@ func main() {
59
111
0 ,
60
112
cache.ResourceEventHandlerFuncs {
61
113
AddFunc : func (obj interface {}) {
62
- if * ignoreNormal && obj .(* corev1.Event ).Type == corev1 .EventTypeNormal {
114
+ evt := obj .(* corev1.Event )
115
+ if * ignoreNormal && evt .Type == corev1 .EventTypeNormal {
63
116
return
64
117
}
65
118
logEvent (obj , loggerEvent )
119
+ recordMetric (evt , fetcher )
66
120
},
67
121
UpdateFunc : func (oldObj , newObj interface {}) {
68
- if * ignoreUpdate || (* ignoreNormal && newObj .(* corev1.Event ).Type == corev1 .EventTypeNormal ) {
122
+ evt := newObj .(* corev1.Event )
123
+ if * ignoreUpdate || (* ignoreNormal && evt .Type == corev1 .EventTypeNormal ) {
69
124
return
70
125
}
71
126
logEvent (newObj , loggerEvent )
127
+ recordMetric (evt , fetcher )
72
128
},
73
129
},
74
130
)
@@ -83,3 +139,130 @@ func logEvent(obj interface{}, logger *log.Logger) {
83
139
j , _ := json .Marshal (obj )
84
140
logger .Printf ("%s\n " , string (j ))
85
141
}
142
+
143
+ func recordMetric (evt * corev1.Event , fetcher * ObjectLabelFetcher ) {
144
+ if ! * metricsEnabled {
145
+ return
146
+ }
147
+
148
+ qoveryProjectId := ""
149
+ qoveryEnvId := ""
150
+ qoveryServiceId := ""
151
+
152
+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
153
+ defer cancel ()
154
+
155
+ labels , err := fetcher .LabelsForEvent (ctx , evt )
156
+ if err == nil {
157
+ qoveryProjectId = labels ["qovery.com/project-id" ]
158
+ qoveryEnvId = labels ["qovery.com/environment-id" ]
159
+ qoveryServiceId = labels ["qovery.com/service-id" ]
160
+ }
161
+
162
+ eventsTotal .
163
+ WithLabelValues (evt .Type , evt .Reason , evt .InvolvedObject .Kind , qoveryProjectId , qoveryEnvId , qoveryServiceId ).
164
+ Inc ()
165
+ }
166
+
167
+ type cacheKey string
168
+
169
+ func keyFromEvent (evt * corev1.Event ) cacheKey {
170
+ return cacheKey (evt .InvolvedObject .UID )
171
+ }
172
+
173
+ type ObjectLabelFetcher struct {
174
+ dynClient dynamic.Interface
175
+ mapper meta.RESTMapper
176
+ cache labelCache [cacheKey , map [string ]string ]
177
+ }
178
+
179
+ type labelCache [K comparable , V any ] interface {
180
+ Get (K ) (V , bool )
181
+ Add (K , V ) bool
182
+ }
183
+
184
+ func NewObjectLabelFetcher (
185
+ cfg * rest.Config ,
186
+ maxEntries int ,
187
+ ttl time.Duration ,
188
+ ) (* ObjectLabelFetcher , error ) {
189
+ dynClient , err := dynamic .NewForConfig (cfg )
190
+ if err != nil {
191
+ return nil , err
192
+ }
193
+
194
+ disco , err := discovery .NewDiscoveryClientForConfig (cfg )
195
+ if err != nil {
196
+ return nil , err
197
+ }
198
+
199
+ mapper := restmapper .NewDeferredDiscoveryRESTMapper (memory .NewMemCacheClient (disco ))
200
+
201
+ // ----- creating the LRU cache (with or without TTL) -----
202
+ var c labelCache [cacheKey , map [string ]string ]
203
+ if ttl > 0 {
204
+ c = expirable .NewLRU [cacheKey , map [string ]string ](maxEntries , nil , ttl )
205
+ } else {
206
+ lruCache , err := lru.New [cacheKey , map [string ]string ](maxEntries )
207
+ if err != nil {
208
+ return nil , err
209
+ }
210
+ c = lruCache
211
+ }
212
+
213
+ return & ObjectLabelFetcher {
214
+ dynClient : dynClient ,
215
+ mapper : mapper ,
216
+ cache : c ,
217
+ }, nil
218
+ }
219
+
220
+ func (f * ObjectLabelFetcher ) LabelsForEvent (ctx context.Context , evt * corev1.Event ) (map [string ]string , error ) {
221
+ key := keyFromEvent (evt )
222
+
223
+ // Fast-path: cache hit
224
+ if lbls , ok := f .cache .Get (key ); ok {
225
+ eventsHit .WithLabelValues ("hit" ).Inc ()
226
+ return lbls , nil
227
+ }
228
+ eventsHit .WithLabelValues ("miss" ).Inc ()
229
+
230
+ // Resolve the GroupVersionKind from the Event.
231
+ gvk := schema .FromAPIVersionAndKind (evt .InvolvedObject .APIVersion , evt .InvolvedObject .Kind )
232
+
233
+ // Translate GVK ➜ GroupVersionResource via the RESTMapper.
234
+ mapping , err := f .mapper .RESTMapping (gvk .GroupKind (), gvk .Version )
235
+ if err != nil {
236
+ return nil , err
237
+ }
238
+
239
+ // Pick the correct dynamic ResourceInterface (namespaced or cluster-wide).
240
+ var dr dynamic.ResourceInterface
241
+ if mapping .Scope .Name () == meta .RESTScopeNameNamespace {
242
+ ns := evt .InvolvedObject .Namespace
243
+ if ns == "" {
244
+ ns = evt .Namespace
245
+ }
246
+ dr = f .dynClient .Resource (mapping .Resource ).Namespace (ns )
247
+ } else {
248
+ dr = f .dynClient .Resource (mapping .Resource )
249
+ }
250
+
251
+ // Retrieve the actual object (no need to unmarshal into a typed struct).
252
+ obj , err := dr .Get (ctx , evt .InvolvedObject .Name , metav1.GetOptions {})
253
+ if err != nil {
254
+ return nil , err
255
+ }
256
+
257
+ // Use the meta.Accessor helper to read generic metadata, including labels.
258
+ accessor , err := meta .Accessor (obj )
259
+ if err != nil {
260
+ return nil , err
261
+ }
262
+ labels := accessor .GetLabels ()
263
+
264
+ // Store in cache (eviction handled automatically)
265
+ f .cache .Add (key , labels )
266
+
267
+ return labels , nil
268
+ }
0 commit comments