From 15e87cb49519afc58c05068c25c39acb0825ce07 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Mon, 16 Sep 2024 08:29:40 +0200 Subject: [PATCH 01/21] Verify PR titles with shell script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan BΓΌringer buringerst@vmware.com --- .github/workflows/verify.yml | 20 ++++++------- hack/verify-pr-title.sh | 54 ++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) create mode 100755 hack/verify-pr-title.sh diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index e24f962101..a5a3e85c7b 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -1,17 +1,17 @@ +name: PR title verifier + on: pull_request_target: - types: [opened, edited, reopened, synchronize] - -permissions: - checks: write # Allow access to checks to write check runs. + types: [opened, edited, synchronize, reopened] jobs: verify: runs-on: ubuntu-latest - name: verify PR contents + steps: - - name: Verifier action - id: verifier - uses: kubernetes-sigs/kubebuilder-release-tools@012269a88fa4c034a0acf1ba84c26b195c0dbab4 # tag=v0.4.3 - with: - github_token: ${{ secrets.GITHUB_TOKEN }} + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # tag=v4.1.7 + + - name: Check if PR title is valid + run: | + ./hack/verify-pr-title.sh "${{ github.event.pull_request.title }}" + diff --git a/hack/verify-pr-title.sh b/hack/verify-pr-title.sh new file mode 100755 index 0000000000..a556b0172b --- /dev/null +++ b/hack/verify-pr-title.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define regex patterns +WIP_REGEX="^\W?WIP\W" +TAG_REGEX="^\[[[:alnum:]\._-]*\]" +PR_TITLE="$1" + +# Trim WIP and tags from title +trimmed_title=$(echo "$PR_TITLE" | sed -E "s/$WIP_REGEX//" | sed -E "s/$TAG_REGEX//" | xargs) + +# Normalize common emojis in text form to actual emojis +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:warning:/⚠/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:sparkles:/✨/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:bug:/πŸ›/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:book:/πŸ“–/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:rocket:/πŸš€/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:seedling:/🌱/g") + +# Check PR type prefix +if [[ "$trimmed_title" =~ ^(⚠|✨|πŸ›|πŸ“–|πŸš€|🌱) ]]; then + echo "PR title is valid: $trimmed_title" +else + echo "Error: No matching PR type indicator found in title." + echo "You need to have one of these as the prefix of your PR title:" + echo "- Breaking change: ⚠ (:warning:)" + echo "- Non-breaking feature: ✨ (:sparkles:)" + echo "- Patch fix: πŸ› (:bug:)" + echo "- Docs: πŸ“– (:book:)" + echo "- Release: πŸš€ (:rocket:)" + echo "- Infra/Tests/Other: 🌱 (:seedling:)" + exit 1 +fi + +# Check that PR title does not contain Issue or PR number +if [[ "$trimmed_title" =~ \#[0-9]+ ]]; then + echo "Error: PR title should not contain issue or PR number." + echo "Issue numbers belong in the PR body as either \"Fixes #XYZ\" (if it closes the issue or PR), or something like \"Related to #XYZ\" (if it's just related)." + exit 1 +fi + From f0e55afc5bdb093c80e052f3af10db748f462275 Mon Sep 17 00:00:00 2001 From: Kevin McDermott Date: Sun, 15 Sep 2024 20:13:03 +0100 Subject: [PATCH 02/21] Preserve TypeMeta for PartialObjectMeta resources This updates the fake client to retain the PartialObjectMeta TypeMeta when getting resources. --- pkg/client/fake/client.go | 5 ++++- pkg/client/fake/client_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 7366a18528..10be14a9df 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -508,7 +508,10 @@ func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.O return err } - if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { + _, isUnstructured := obj.(runtime.Unstructured) + _, isPartialObject := obj.(*metav1.PartialObjectMetadata) + + if isUnstructured || isPartialObject { gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index e86a64eefc..ae537d5b43 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -339,6 +339,33 @@ var _ = Describe("Fake client", func() { Expect(apierrors.IsNotFound(err)).To(BeTrue()) }) + It("should be able to retrieve objects by PartialObjectMetadata", func() { + By("Creating a Resource") + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + } + err := cl.Create(context.Background(), secret) + Expect(err).ToNot(HaveOccurred()) + + By("Fetching the resource using a PartialObjectMeta") + partialObjMeta := &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + } + partialObjMeta.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret")) + + err = cl.Get(context.Background(), client.ObjectKeyFromObject(partialObjMeta), partialObjMeta) + Expect(err).ToNot(HaveOccurred()) + + Expect(partialObjMeta.Kind).To(Equal("Secret")) + Expect(partialObjMeta.APIVersion).To(Equal("v1")) + }) + It("should support filtering by labels and their values", func() { By("Listing deployments with a particular label and value") list := &appsv1.DeploymentList{} From b40036699edf7c81ad45f2af28be6ac3ebab9528 Mon Sep 17 00:00:00 2001 From: Christian Schlotter Date: Fri, 27 Sep 2024 11:07:59 +0200 Subject: [PATCH 03/21] pr-verify: use env var for passing the PR title Co-Authored-By: Aviv Keller --- .github/workflows/verify.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index a5a3e85c7b..dfe953846f 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -12,6 +12,7 @@ jobs: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # tag=v4.1.7 - name: Check if PR title is valid - run: | - ./hack/verify-pr-title.sh "${{ github.event.pull_request.title }}" - + env: + PR_TITLE: ${{ github.event.pull_request.title }} + run: | + ./hack/verify-pr-title.sh "${{ github.event.pull_request.title }}" From 465b62a5b08168851bff62a23f8e634f8e52e1bc Mon Sep 17 00:00:00 2001 From: Christian Schlotter Date: Fri, 27 Sep 2024 12:05:51 +0200 Subject: [PATCH 04/21] pr-verify: use env var for passing the PR title Co-Authored-By: Aviv Keller --- .github/workflows/verify.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index dfe953846f..a66ba0c43f 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -15,4 +15,4 @@ jobs: env: PR_TITLE: ${{ github.event.pull_request.title }} run: | - ./hack/verify-pr-title.sh "${{ github.event.pull_request.title }}" + ./hack/verify-pr-title.sh "${PR_TITLE}" From f883b25b1b48aebcd4a1dc2aa42090e47d32b51d Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Fri, 11 Oct 2024 15:55:11 +0200 Subject: [PATCH 05/21] Fix PR verify action MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan BΓΌringer buringerst@vmware.com --- .github/workflows/verify.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index a66ba0c43f..303c28b9d4 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -9,9 +9,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # tag=v4.1.7 + - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # tag=v4.2.1 - - name: Check if PR title is valid + - name: Check if PR title is valid env: PR_TITLE: ${{ github.event.pull_request.title }} run: | From 44214256ba36506ed2a0cb0910c687bc172cf78a Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 12 Oct 2024 13:42:01 -0400 Subject: [PATCH 06/21] bug: Fakeclient: Fix TOCTOU races The fake client currently has a number of time of check time of use races, where it fetches an object to determine what to do in a mutating operation. The problem is that the object might change in between fetching it and doing the mutating operation. Most notably, this happens when: * Patching is done in parallel. Only one of the patches will succeed, the other ones will fail with a conflict * Updates of objects that allow unconditional updates: All updates will succeed, but not all of them will increment the resource version (i.E dirty writes for the RV) * An update for an object that allows createOnUpdate races with a create or delete * A DeleteAllOf call races with Delete calls * A scale update races with a normal update This change: * Adds tests for all of these cases * Fixes them by adding a lock around the write operations, including their read part, if any --- pkg/client/fake/client.go | 38 ++++- pkg/client/fake/client_test.go | 282 ++++++++++++++++++++++++++++++++- 2 files changed, 309 insertions(+), 11 deletions(-) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 10be14a9df..54f5b5d258 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -68,16 +68,21 @@ type versionedTracker struct { } type fakeClient struct { - tracker versionedTracker - scheme *runtime.Scheme + // trackerWriteLock must be acquired before writing to + // the tracker or performing reads that affect a following + // write. + trackerWriteLock sync.Mutex + tracker versionedTracker + + schemeWriteLock sync.Mutex + scheme *runtime.Scheme + restMapper meta.RESTMapper withStatusSubresource sets.Set[schema.GroupVersionKind] // indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK. // The inner map maps from index name to IndexerFunc. indexes map[schema.GroupVersionKind]map[string]client.IndexerFunc - - schemeWriteLock sync.Mutex } var _ client.WithWatch = &fakeClient{} @@ -467,6 +472,11 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt switch { case allowsUnconditionalUpdate(gvk): accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) + // This is needed because if the patch explicitly sets the RV to null, the client-go reaction we use + // to apply it and whose output we process here will have it unset. It is not clear why the Kubernetes + // apiserver accepts such a patch, but it does so we just copy that behavior. + // Kubernetes apiserver behavior can be checked like this: + // `kubectl patch configmap foo --patch '{"metadata":{"annotations":{"foo":"bar"},"resourceVersion":null}}' -v=9` case bytes. Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeClient).Patch")): // We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change @@ -732,6 +742,8 @@ func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...clie accessor.SetDeletionTimestamp(nil) } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() return c.tracker.Create(gvr, obj, accessor.GetNamespace()) } @@ -753,6 +765,8 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() // Check the ResourceVersion if that Precondition was specified. if delOptions.Preconditions != nil && delOptions.Preconditions.ResourceVersion != nil { name := accessor.GetName() @@ -775,7 +789,7 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } } - return c.deleteObject(gvr, accessor) + return c.deleteObjectLocked(gvr, accessor) } func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { @@ -793,6 +807,9 @@ func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts .. } } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) o, err := c.tracker.List(gvr, gvk, dcOptions.Namespace) if err != nil { @@ -812,7 +829,7 @@ func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts .. if err != nil { return err } - err = c.deleteObject(gvr, accessor) + err = c.deleteObjectLocked(gvr, accessor) if err != nil { return err } @@ -842,6 +859,9 @@ func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.Upd if err != nil { return err } + + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() return c.tracker.update(gvr, obj, accessor.GetNamespace(), isStatus, false, *updateOptions.AsUpdateOptions()) } @@ -877,6 +897,8 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client return err } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() oldObj, err := c.tracker.Get(gvr, accessor.GetNamespace(), accessor.GetName()) if err != nil { return err @@ -1085,7 +1107,7 @@ func (c *fakeClient) SubResource(subResource string) client.SubResourceClient { return &fakeSubResourceClient{client: c, subResource: subResource} } -func (c *fakeClient) deleteObject(gvr schema.GroupVersionResource, accessor metav1.Object) error { +func (c *fakeClient) deleteObjectLocked(gvr schema.GroupVersionResource, accessor metav1.Object) error { old, err := c.tracker.Get(gvr, accessor.GetNamespace(), accessor.GetName()) if err == nil { oldAccessor, err := meta.Accessor(old) @@ -1167,7 +1189,7 @@ func (sw *fakeSubResourceClient) Update(ctx context.Context, obj client.Object, switch sw.subResource { case subResourceScale: - if err := sw.client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil { + if err := sw.client.Get(ctx, client.ObjectKeyFromObject(obj), obj.DeepCopyObject().(client.Object)); err != nil { return err } if updateOptions.SubResourceBody == nil { diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index ae537d5b43..2f5392fda7 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "time" "github.com/google/go-cmp/cmp" @@ -579,7 +580,7 @@ var _ = Describe("Fake client", func() { Expect(obj.ObjectMeta.ResourceVersion).To(Equal("1000")) }) - It("should allow patch with non-set ResourceVersion for a resource that doesn't allow unconditional updates", func() { + It("should allow patch when the patch sets RV to 'null'", func() { schemeBuilder := &scheme.Builder{GroupVersion: schema.GroupVersion{Group: "test", Version: "v1"}} schemeBuilder.Register(&WithPointerMeta{}, &WithPointerMetaList{}) @@ -604,6 +605,7 @@ var _ = Describe("Fake client", func() { "foo": "bar", }, }} + Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(original))).To(Succeed()) patched := &WithPointerMeta{} @@ -2098,6 +2100,280 @@ var _ = Describe("Fake client", func() { Expect(apierrors.IsNotFound(err)).To(BeTrue()) }) + It("should allow concurrent patches to a configMap", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries) + + for i := range tries { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(obj))).To(Succeed()) + }() + } + wg.Wait() + + // While the order is not deterministic, there must be $tries distinct updates + // that each increment the resource version by one + Expect(cl.Get(context.Background(), client.ObjectKey{Name: "foo"}, obj)).To(Succeed()) + Expect(obj.ResourceVersion).To(Equal(strconv.Itoa(tries))) + }) + + It("should not allow concurrent patches to a configMap if the patch contains a ResourceVersion", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + wg := sync.WaitGroup{} + wg.Add(5) + + for i := range 5 { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.ResourceVersion = "1" // include an invalid RV to cause a conflict + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + Expect(apierrors.IsConflict(cl.Patch(context.Background(), newObj, client.MergeFrom(obj)))).To(BeTrue()) + }() + } + wg.Wait() + }) + + It("should allow concurrent updates to an object that allows unconditionalUpdate if the incoming request has no RV", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries) + + for i := range tries { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + newObj.ResourceVersion = "" + Expect(cl.Update(context.Background(), newObj)).To(Succeed()) + }() + } + wg.Wait() + + // While the order is not deterministic, there must be $tries distinct updates + // that each increment the resource version by one + Expect(cl.Get(context.Background(), client.ObjectKey{Name: "foo"}, obj)).To(Succeed()) + Expect(obj.ResourceVersion).To(Equal(strconv.Itoa(tries))) + }) + + It("If a create races with an update for an object that allows createOnUpdate, the update should always succeed", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries * 2) + + for i := range tries { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + go func() { + defer wg.Done() + defer GinkgoRecover() + + // this may or may not succeed depending on if we win the race. Either is acceptable, + // but if it fails, it must fail due to an AlreadyExists. + err := cl.Create(context.Background(), obj.DeepCopy()) + if err != nil { + Expect(apierrors.IsAlreadyExists(err)).To(BeTrue()) + } + }() + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This must always succeed, regardless of the outcome of the create. + Expect(cl.Update(context.Background(), obj.DeepCopy())).To(Succeed()) + }() + } + + wg.Wait() + }) + + It("If a delete races with an update for an object that allows createOnUpdate, the update should always succeed", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries * 2) + + for i := range tries { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + Expect(cl.Create(context.Background(), obj.DeepCopy())).To(Succeed()) + + go func() { + defer wg.Done() + defer GinkgoRecover() + + Expect(cl.Delete(context.Background(), obj.DeepCopy())).To(Succeed()) + }() + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This must always succeed, regardless of if the delete came before or + // after us. + Expect(cl.Update(context.Background(), obj.DeepCopy())).To(Succeed()) + }() + } + + wg.Wait() + }) + + It("If a DeleteAllOf races with a delete, the DeleteAllOf should always succeed", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const objects = 50 + wg := sync.WaitGroup{} + wg.Add(objects) + + for i := range objects { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + Expect(cl.Create(context.Background(), obj.DeepCopy())).To(Succeed()) + } + + for i := range objects { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This may or may not succeed depending on if the DeleteAllOf is faster, + // but if it fails, it should be a not found. + err := cl.Delete(context.Background(), obj) + if err != nil { + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + } + }() + } + Expect(cl.DeleteAllOf(context.Background(), &corev1.Service{})).To(Succeed()) + + wg.Wait() + }) + + It("If an update races with a scale update, only one of them succeeds", func() { + scheme := runtime.NewScheme() + Expect(appsv1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const tries = 5000 + for i := range tries { + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + Expect(cl.Create(context.Background(), dep)).To(Succeed()) + + wg := sync.WaitGroup{} + wg.Add(2) + var updateSucceeded bool + var scaleSucceeded bool + + go func() { + defer wg.Done() + defer GinkgoRecover() + + dep := dep.DeepCopy() + dep.Annotations = map[string]string{"foo": "bar"} + + // This may or may not fail. If it does fail, it must be a conflict. + err := cl.Update(context.Background(), dep) + if err != nil { + Expect(apierrors.IsConflict(err)).To(BeTrue()) + } else { + updateSucceeded = true + } + }() + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This may or may not fail. If it does fail, it must be a conflict. + scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 10}} + err := cl.SubResource("scale").Update(context.Background(), dep.DeepCopy(), client.WithSubResourceBody(scale)) + if err != nil { + Expect(apierrors.IsConflict(err)).To(BeTrue()) + } else { + scaleSucceeded = true + } + }() + + wg.Wait() + Expect(updateSucceeded).ToNot(Equal(scaleSucceeded)) + } + + }) + It("disallows scale subresources on unsupported built-in types", func() { scheme := runtime.NewScheme() Expect(corev1.AddToScheme(scheme)).To(Succeed()) @@ -2252,8 +2528,8 @@ func (t *WithPointerMetaList) DeepCopyObject() runtime.Object { } type WithPointerMeta struct { - *metav1.TypeMeta - *metav1.ObjectMeta + *metav1.TypeMeta `json:",inline"` + *metav1.ObjectMeta `json:"metadata,omitempty"` } func (t *WithPointerMeta) DeepCopy() *WithPointerMeta { From bfd1cf925238323fd53b372780a3d0a1a750a85c Mon Sep 17 00:00:00 2001 From: k8s-infra-cherrypick-robot <90416843+k8s-infra-cherrypick-robot@users.noreply.github.com> Date: Thu, 21 Nov 2024 08:32:55 -0800 Subject: [PATCH 07/21] =?UTF-8?q?[release-0.19]=20=E2=9C=A8=20Add=20Enable?= =?UTF-8?q?WatchBookmarks=20option=20to=20cache=20informers=20(#3018)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Ensure all WatchFunc enable watch and boomarks AllowWatchBookmarks is generally pretty safe to enable as it has been available in Kuberentes for a long while, and the server ignores the flag if it doesn't implement it (per docs). Signed-off-by: Vince Prignano * Defaults to false for 0.19 Signed-off-by: Vince Prignano --------- Signed-off-by: Vince Prignano Co-authored-by: Vince Prignano --- pkg/cache/cache.go | 38 ++++++++++++++++++++++++++++++++- pkg/cache/defaulting_test.go | 24 +++++++++++++++++++++ pkg/cache/internal/informers.go | 16 +++++++++++++- 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 706f9c6cdd..406380d329 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -222,6 +222,18 @@ type Options struct { // DefaultNamespaces. DefaultUnsafeDisableDeepCopy *bool + // DefaultEnableWatchBookmarks requests watch events with type "BOOKMARK". + // Servers that do not implement bookmarks may ignore this flag and + // bookmarks are sent at the server's discretion. Clients should not + // assume bookmarks are returned at any specific interval, nor may they + // assume the server will send any BOOKMARK event during a session. + // + // This will be used for all object types, unless it is set in ByObject or + // DefaultNamespaces. + // + // Defaults to false. + DefaultEnableWatchBookmarks *bool + // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. // If unset, this will fall through to the Default* settings. ByObject map[client.Object]ByObject @@ -272,6 +284,15 @@ type ByObject struct { // Be very careful with this, when enabled you must DeepCopy any object before mutating it, // otherwise you will mutate the object in the cache. UnsafeDisableDeepCopy *bool + + // EnableWatchBookmarks requests watch events with type "BOOKMARK". + // Servers that do not implement bookmarks may ignore this flag and + // bookmarks are sent at the server's discretion. Clients should not + // assume bookmarks are returned at any specific interval, nor may they + // assume the server will send any BOOKMARK event during a session. + // + // Defaults to false. + EnableWatchBookmarks *bool } // Config describes all potential options for a given watch. @@ -298,6 +319,15 @@ type Config struct { // UnsafeDisableDeepCopy specifies if List and Get requests against the // cache should not DeepCopy. A nil value allows to default this. UnsafeDisableDeepCopy *bool + + // EnableWatchBookmarks requests watch events with type "BOOKMARK". + // Servers that do not implement bookmarks may ignore this flag and + // bookmarks are sent at the server's discretion. Clients should not + // assume bookmarks are returned at any specific interval, nor may they + // assume the server will send any BOOKMARK event during a session. + // + // Defaults to false. + EnableWatchBookmarks *bool } // NewCacheFunc - Function for creating a new cache from the options and a rest config. @@ -367,6 +397,7 @@ func optionDefaultsToConfig(opts *Options) Config { FieldSelector: opts.DefaultFieldSelector, Transform: opts.DefaultTransform, UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy, + EnableWatchBookmarks: opts.DefaultEnableWatchBookmarks, } } @@ -376,6 +407,7 @@ func byObjectToConfig(byObject ByObject) Config { FieldSelector: byObject.Field, Transform: byObject.Transform, UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy, + EnableWatchBookmarks: byObject.EnableWatchBookmarks, } } @@ -398,6 +430,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Transform: config.Transform, WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), + EnableWatchBookmarks: ptr.Deref(config.EnableWatchBookmarks, false), NewInformer: opts.newInformer, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, @@ -482,6 +515,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { byObject.Field = defaultedConfig.FieldSelector byObject.Transform = defaultedConfig.Transform byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy + byObject.EnableWatchBookmarks = defaultedConfig.EnableWatchBookmarks } opts.ByObject[obj] = byObject @@ -523,7 +557,9 @@ func defaultConfig(toDefault, defaultFrom Config) Config { if toDefault.UnsafeDisableDeepCopy == nil { toDefault.UnsafeDisableDeepCopy = defaultFrom.UnsafeDisableDeepCopy } - + if toDefault.EnableWatchBookmarks == nil { + toDefault.EnableWatchBookmarks = defaultFrom.EnableWatchBookmarks + } return toDefault } diff --git a/pkg/cache/defaulting_test.go b/pkg/cache/defaulting_test.go index 3c01bf8404..8e3033eb47 100644 --- a/pkg/cache/defaulting_test.go +++ b/pkg/cache/defaulting_test.go @@ -224,6 +224,30 @@ func TestDefaultOpts(t *testing.T) { return cmp.Diff(expected, o.ByObject[pod].UnsafeDisableDeepCopy) }, }, + { + name: "ByObject.EnableWatchBookmarks gets defaulted from DefaultEnableWatchBookmarks", + in: Options{ + ByObject: map[client.Object]ByObject{pod: {}}, + DefaultEnableWatchBookmarks: ptr.To(true), + }, + + verification: func(o Options) string { + expected := ptr.To(true) + return cmp.Diff(expected, o.ByObject[pod].EnableWatchBookmarks) + }, + }, + { + name: "ByObject.EnableWatchBookmarks doesn't get defaulted when set", + in: Options{ + ByObject: map[client.Object]ByObject{pod: {EnableWatchBookmarks: ptr.To(false)}}, + DefaultEnableWatchBookmarks: ptr.To(true), + }, + + verification: func(o Options) string { + expected := ptr.To(false) + return cmp.Diff(expected, o.ByObject[pod].EnableWatchBookmarks) + }, + }, { name: "DefaultNamespace label selector gets defaulted from DefaultLabelSelector", in: Options{ diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index cd8c6774ca..a40382d6f3 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -51,6 +51,7 @@ type InformersOpts struct { Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool + EnableWatchBookmarks bool WatchErrorHandler cache.WatchErrorHandler } @@ -78,6 +79,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { selector: options.Selector, transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, + enableWatchBookmarks: options.EnableWatchBookmarks, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, } @@ -174,6 +176,7 @@ type Informers struct { selector Selector transform cache.TransformFunc unsafeDisableDeepCopy bool + enableWatchBookmarks bool // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer @@ -361,8 +364,10 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O return listWatcher.ListFunc(opts) }, WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + + ip.selector.ApplyToList(&opts) return listWatcher.WatchFunc(opts) }, }, obj, calculateResyncPeriod(ip.resync), cache.Indexers{ @@ -444,6 +449,9 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + if namespace != "" { return resources.Namespace(namespace).Watch(ip.ctx, opts) } @@ -486,6 +494,9 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) { + opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + if namespace != "" { watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts) } else { @@ -527,6 +538,9 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + // Build the request. req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) if namespace != "" { From 01707429e75ad5588b874cf33cb888f0476a6659 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 29 Nov 2024 21:38:41 -0500 Subject: [PATCH 08/21] warning: Use leader elector with client timeout This change makes the leader elector use a client that internally has a smaller timeout than the renew deadline, which avoids a situation where a single request timing out makes us lose the leader lease. --- pkg/leaderelection/leader_election.go | 24 +++++++++--------------- pkg/manager/manager.go | 1 + pkg/manager/manager_test.go | 22 ++++++++++++++++++++++ 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index ee4fcf4cbe..5a41f5c747 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -20,10 +20,9 @@ import ( "errors" "fmt" "os" + "time" "k8s.io/apimachinery/pkg/util/uuid" - coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -49,6 +48,9 @@ type Options struct { // LeaderElectionID determines the name of the resource that leader election // will use for holding the leader lock. LeaderElectionID string + + // RewnewDeadline is the renew deadline for this leader election client + RewnewDeadline time.Duration } // NewResourceLock creates a new resource lock for use in a leader election loop. @@ -88,25 +90,17 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op // Construct clients for leader election rest.AddUserAgent(config, "leader-election") - corev1Client, err := corev1client.NewForConfig(config) - if err != nil { - return nil, err - } - - coordinationClient, err := coordinationv1client.NewForConfig(config) - if err != nil { - return nil, err - } - return resourcelock.New(options.LeaderElectionResourceLock, + return resourcelock.NewFromKubeconfig(options.LeaderElectionResourceLock, options.LeaderElectionNamespace, options.LeaderElectionID, - corev1Client, - coordinationClient, resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: recorderProvider.GetEventRecorderFor(id), - }) + }, + config, + options.RewnewDeadline, + ) } func getInClusterNamespace() (string, error) { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 3166f4818f..8e5d3bc8a7 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -389,6 +389,7 @@ func New(config *rest.Config, options Options) (Manager, error) { LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, + RewnewDeadline: *options.RenewDeadline, }) if err != nil { return nil, err diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index c42d2f2ae7..792bc4f967 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -317,6 +317,28 @@ var _ = Describe("manger.Manager", func() { <-m2done }) + It("should default RenewDeadline for leader election config", func() { + var rl resourcelock.Interface + m1, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-id", + newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) { + if options.RewnewDeadline != 10*time.Second { + return nil, fmt.Errorf("expected RenewDeadline to be 10s, got %v", options.RewnewDeadline) + } + var err error + rl, err = leaderelection.NewResourceLock(config, recorderProvider, options) + return rl, err + }, + HealthProbeBindAddress: "0", + Metrics: metricsserver.Options{BindAddress: "0"}, + PprofBindAddress: "0", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(m1).ToNot(BeNil()) + }) + It("should default ID to controller-runtime if ID is not set", func() { var rl resourcelock.Interface m1, err := New(cfg, Options{ From 4bc3811a419e7564daea4baaabc1bae9d4c011df Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Sat, 30 Nov 2024 07:09:43 -0800 Subject: [PATCH 09/21] :bug: Fix RenewDeadline typo in leader election Signed-off-by: Vince Prignano --- pkg/leaderelection/leader_election.go | 6 +++--- pkg/manager/manager.go | 2 +- pkg/manager/manager_test.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 5a41f5c747..69daf94e48 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -49,8 +49,8 @@ type Options struct { // will use for holding the leader lock. LeaderElectionID string - // RewnewDeadline is the renew deadline for this leader election client - RewnewDeadline time.Duration + // RenewDeadline is the renew deadline for this leader election client + RenewDeadline time.Duration } // NewResourceLock creates a new resource lock for use in a leader election loop. @@ -99,7 +99,7 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op EventRecorder: recorderProvider.GetEventRecorderFor(id), }, config, - options.RewnewDeadline, + options.RenewDeadline, ) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 8e5d3bc8a7..92906fe6ca 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -389,7 +389,7 @@ func New(config *rest.Config, options Options) (Manager, error) { LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, - RewnewDeadline: *options.RenewDeadline, + RenewDeadline: *options.RenewDeadline, }) if err != nil { return nil, err diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 792bc4f967..6e5353e345 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -324,8 +324,8 @@ var _ = Describe("manger.Manager", func() { LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id", newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) { - if options.RewnewDeadline != 10*time.Second { - return nil, fmt.Errorf("expected RenewDeadline to be 10s, got %v", options.RewnewDeadline) + if options.RenewDeadline != 10*time.Second { + return nil, fmt.Errorf("expected RenewDeadline to be 10s, got %v", options.RenewDeadline) } var err error rl, err = leaderelection.NewResourceLock(config, recorderProvider, options) From 2a0ce596d97af3673d20ff8930a60dba89a20ca0 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 1 Dec 2024 11:21:14 -0500 Subject: [PATCH 10/21] :seedling: Make using leader elector with client timeout non-breaking This change is a follow-up to the one that introduces the usage of the leader-elector with client timeout. That change was breaking because it introduces a new option and always assumed it was set. This change makes us only use that option if its actually set. --- pkg/leaderelection/leader_election.go | 35 ++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index 69daf94e48..5cc253917a 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -23,6 +23,8 @@ import ( "time" "k8s.io/apimachinery/pkg/util/uuid" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -49,7 +51,10 @@ type Options struct { // will use for holding the leader lock. LeaderElectionID string - // RenewDeadline is the renew deadline for this leader election client + // RenewDeadline is the renew deadline for this leader election client. + // Must be set to ensure the resource lock has an appropriate client timeout. + // Without that, a single slow response from the API server can result + // in losing leadership. RenewDeadline time.Duration } @@ -91,15 +96,37 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op // Construct clients for leader election rest.AddUserAgent(config, "leader-election") - return resourcelock.NewFromKubeconfig(options.LeaderElectionResourceLock, + if options.RenewDeadline != 0 { + return resourcelock.NewFromKubeconfig(options.LeaderElectionResourceLock, + options.LeaderElectionNamespace, + options.LeaderElectionID, + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorderProvider.GetEventRecorderFor(id), + }, + config, + options.RenewDeadline, + ) + } + + corev1Client, err := corev1client.NewForConfig(config) + if err != nil { + return nil, err + } + + coordinationClient, err := coordinationv1client.NewForConfig(config) + if err != nil { + return nil, err + } + return resourcelock.New(options.LeaderElectionResourceLock, options.LeaderElectionNamespace, options.LeaderElectionID, + corev1Client, + coordinationClient, resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: recorderProvider.GetEventRecorderFor(id), }, - config, - options.RenewDeadline, ) } From e727239ea3dea3c649e405af40af172844e7bd4e Mon Sep 17 00:00:00 2001 From: k8s-infra-cherrypick-robot <90416843+k8s-infra-cherrypick-robot@users.noreply.github.com> Date: Mon, 2 Dec 2024 07:38:59 -0800 Subject: [PATCH 11/21] =?UTF-8?q?[release-0.19]=20=F0=9F=90=9B=20Refactor?= =?UTF-8?q?=20certificate=20watcher=20to=20use=20polling,=20instead=20of?= =?UTF-8?q?=20fsnotify=20(#3023)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reestablish watch for the certificate paths * Remove fsnotify and use cached read watcher * Simplify return * πŸ›Fix certwatcher test to be backwards compatible --------- Co-authored-by: Maxim Muzafarov --- examples/scratch-env/go.mod | 1 - examples/scratch-env/go.sum | 2 - go.mod | 2 +- pkg/certwatcher/certwatcher.go | 161 ++++++++-------------- pkg/certwatcher/certwatcher_suite_test.go | 1 + pkg/certwatcher/certwatcher_test.go | 51 +++++-- pkg/certwatcher/example_test.go | 2 +- pkg/certwatcher/metrics/metrics.go | 1 + 8 files changed, 101 insertions(+), 120 deletions(-) diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index dceb4d12aa..8cf3ce725b 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -14,7 +14,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 89d30c15c1..a920487db5 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -13,8 +13,6 @@ github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8 github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= -github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= -github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= diff --git a/go.mod b/go.mod index 3fd1aa9562..a0a01baa74 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22.0 require ( github.com/evanphx/json-patch/v5 v5.9.0 - github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/google/go-cmp v0.6.0 @@ -40,6 +39,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/pkg/certwatcher/certwatcher.go b/pkg/certwatcher/certwatcher.go index fe15fc0dd7..d295b29864 100644 --- a/pkg/certwatcher/certwatcher.go +++ b/pkg/certwatcher/certwatcher.go @@ -17,58 +17,55 @@ limitations under the License. package certwatcher import ( + "bytes" "context" "crypto/tls" - "fmt" + "os" "sync" "time" - "github.com/fsnotify/fsnotify" - kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" ) var log = logf.RuntimeLog.WithName("certwatcher") -// CertWatcher watches certificate and key files for changes. When either file -// changes, it reads and parses both and calls an optional callback with the new -// certificate. +const defaultWatchInterval = 10 * time.Second + +// CertWatcher watches certificate and key files for changes. +// It always returns the cached version, +// but periodically reads and parses certificate and key for changes +// and calls an optional callback with the new certificate. type CertWatcher struct { sync.RWMutex currentCert *tls.Certificate - watcher *fsnotify.Watcher + interval time.Duration certPath string keyPath string + cachedKeyPEMBlock []byte + // callback is a function to be invoked when the certificate changes. callback func(tls.Certificate) } // New returns a new CertWatcher watching the given certificate and key. func New(certPath, keyPath string) (*CertWatcher, error) { - var err error - cw := &CertWatcher{ certPath: certPath, keyPath: keyPath, + interval: defaultWatchInterval, } - // Initial read of certificate and key. - if err := cw.ReadCertificate(); err != nil { - return nil, err - } - - cw.watcher, err = fsnotify.NewWatcher() - if err != nil { - return nil, err - } + return cw, cw.ReadCertificate() +} - return cw, nil +// WithWatchInterval sets the watch interval and returns the CertWatcher pointer +func (cw *CertWatcher) WithWatchInterval(interval time.Duration) *CertWatcher { + cw.interval = interval + return cw } // RegisterCallback registers a callback to be invoked when the certificate changes. @@ -91,72 +88,64 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, // Start starts the watch on the certificate and key files. func (cw *CertWatcher) Start(ctx context.Context) error { - files := sets.New(cw.certPath, cw.keyPath) - - { - var watchErr error - if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { - for _, f := range files.UnsortedList() { - if err := cw.watcher.Add(f); err != nil { - watchErr = err - return false, nil //nolint:nilerr // We want to keep trying. - } - // We've added the watch, remove it from the set. - files.Delete(f) - } - return true, nil - }); err != nil { - return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr})) - } - } - - go cw.Watch() + ticker := time.NewTicker(cw.interval) + defer ticker.Stop() log.Info("Starting certificate watcher") - - // Block until the context is done. - <-ctx.Done() - - return cw.watcher.Close() -} - -// Watch reads events from the watcher's channel and reacts to changes. -func (cw *CertWatcher) Watch() { for { select { - case event, ok := <-cw.watcher.Events: - // Channel is closed. - if !ok { - return + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := cw.ReadCertificate(); err != nil { + log.Error(err, "failed read certificate") } + } + } +} - cw.handleEvent(event) - - case err, ok := <-cw.watcher.Errors: - // Channel is closed. - if !ok { - return - } +// updateCachedCertificate checks if the new certificate differs from the cache, +// updates it and returns the result if it was updated or not +func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool { + cw.Lock() + defer cw.Unlock() - log.Error(err, "certificate watch error") - } + if cw.currentCert != nil && + bytes.Equal(cw.currentCert.Certificate[0], cert.Certificate[0]) && + bytes.Equal(cw.cachedKeyPEMBlock, keyPEMBlock) { + log.V(7).Info("certificate already cached") + return false } + cw.currentCert = cert + cw.cachedKeyPEMBlock = keyPEMBlock + return true } // ReadCertificate reads the certificate and key files from disk, parses them, -// and updates the current certificate on the watcher. If a callback is set, it +// and updates the current certificate on the watcher if updated. If a callback is set, it // is invoked with the new certificate. func (cw *CertWatcher) ReadCertificate() error { metrics.ReadCertificateTotal.Inc() - cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath) + certPEMBlock, err := os.ReadFile(cw.certPath) + if err != nil { + metrics.ReadCertificateErrors.Inc() + return err + } + keyPEMBlock, err := os.ReadFile(cw.keyPath) if err != nil { metrics.ReadCertificateErrors.Inc() return err } - cw.Lock() - cw.currentCert = &cert - cw.Unlock() + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + metrics.ReadCertificateErrors.Inc() + return err + } + + if !cw.updateCachedCertificate(&cert, keyPEMBlock) { + return nil + } log.Info("Updated current TLS certificate") @@ -170,39 +159,3 @@ func (cw *CertWatcher) ReadCertificate() error { } return nil } - -func (cw *CertWatcher) handleEvent(event fsnotify.Event) { - // Only care about events which may modify the contents of the file. - if !(isWrite(event) || isRemove(event) || isCreate(event) || isChmod(event)) { - return - } - - log.V(1).Info("certificate event", "event", event) - - // If the file was removed or renamed, re-add the watch to the previous name - if isRemove(event) || isChmod(event) { - if err := cw.watcher.Add(event.Name); err != nil { - log.Error(err, "error re-watching file") - } - } - - if err := cw.ReadCertificate(); err != nil { - log.Error(err, "error re-reading certificate") - } -} - -func isWrite(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Write) -} - -func isCreate(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Create) -} - -func isRemove(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Remove) -} - -func isChmod(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Chmod) -} diff --git a/pkg/certwatcher/certwatcher_suite_test.go b/pkg/certwatcher/certwatcher_suite_test.go index 2d0f677685..d0d9a72a62 100644 --- a/pkg/certwatcher/certwatcher_suite_test.go +++ b/pkg/certwatcher/certwatcher_suite_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) diff --git a/pkg/certwatcher/certwatcher_test.go b/pkg/certwatcher/certwatcher_test.go index 1fb247581f..fa7e09adae 100644 --- a/pkg/certwatcher/certwatcher_test.go +++ b/pkg/certwatcher/certwatcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package certwatcher_test import ( + "bytes" "context" "crypto/rand" "crypto/rsa" @@ -34,6 +35,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus/testutil" + "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" ) @@ -80,7 +82,7 @@ var _ = Describe("CertWatcher", func() { go func() { defer GinkgoRecover() defer close(doneCh) - Expect(watcher.Start(ctx)).To(Succeed()) + Expect(watcher.WithWatchInterval(time.Second).Start(ctx)).To(Succeed()) }() // wait till we read first cert Eventually(func() error { @@ -113,7 +115,7 @@ var _ = Describe("CertWatcher", func() { Eventually(func() bool { secondcert, _ := watcher.GetCertificate(nil) first := firstcert.PrivateKey.(*rsa.PrivateKey) - return first.Equal(secondcert.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) }).ShouldNot(BeTrue()) ctxCancel() @@ -143,7 +145,7 @@ var _ = Describe("CertWatcher", func() { Eventually(func() bool { secondcert, _ := watcher.GetCertificate(nil) first := firstcert.PrivateKey.(*rsa.PrivateKey) - return first.Equal(secondcert.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) }).ShouldNot(BeTrue()) ctxCancel() @@ -151,6 +153,33 @@ var _ = Describe("CertWatcher", func() { Expect(called.Load()).To(BeNumerically(">=", 1)) }) + It("should reload currentCert after move out", func() { + doneCh := startWatcher() + called := atomic.Int64{} + watcher.RegisterCallback(func(crt tls.Certificate) { + called.Add(1) + Expect(crt.Certificate).ToNot(BeEmpty()) + }) + + firstcert, _ := watcher.GetCertificate(nil) + + Expect(os.Rename(certPath, certPath+".old")).To(Succeed()) + Expect(os.Rename(keyPath, keyPath+".old")).To(Succeed()) + + err := writeCerts(certPath, keyPath, "192.168.0.3") + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() bool { + secondcert, _ := watcher.GetCertificate(nil) + first := firstcert.PrivateKey.(*rsa.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) + }, "10s", "1s").ShouldNot(BeTrue()) + + ctxCancel() + Eventually(doneCh, "4s").Should(BeClosed()) + Expect(called.Load()).To(BeNumerically(">=", 1)) + }) + Context("prometheus metric read_certificate_total", func() { var readCertificateTotalBefore float64 var readCertificateErrorsBefore float64 @@ -165,8 +194,8 @@ var _ = Describe("CertWatcher", func() { Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+1.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+1.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) } return nil }, "4s").Should(Succeed()) @@ -180,8 +209,8 @@ var _ = Describe("CertWatcher", func() { Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+1.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+1.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) } readCertificateTotalBefore = readCertificateTotalAfter return nil @@ -192,15 +221,15 @@ var _ = Describe("CertWatcher", func() { // Note, we are checking two errors here, because os.Remove generates two fsnotify events: Chmod + Remove Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+2.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+2.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter) } return nil }, "4s").Should(Succeed()) Eventually(func() error { readCertificateErrorsAfter := testutil.ToFloat64(metrics.ReadCertificateErrors) - if readCertificateErrorsAfter != readCertificateErrorsBefore+2.0 { - return fmt.Errorf("metric read certificate errors expected: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter) + if readCertificateErrorsAfter < readCertificateErrorsBefore+2.0 { + return fmt.Errorf("metric read certificate errors expected at least: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter) } return nil }, "4s").Should(Succeed()) diff --git a/pkg/certwatcher/example_test.go b/pkg/certwatcher/example_test.go index e322aeebfc..e85b2403cb 100644 --- a/pkg/certwatcher/example_test.go +++ b/pkg/certwatcher/example_test.go @@ -39,7 +39,7 @@ func Example() { panic(err) } - // Start goroutine with certwatcher running fsnotify against supplied certdir + // Start goroutine with certwatcher running against supplied cert go func() { if err := watcher.Start(ctx); err != nil { panic(err) diff --git a/pkg/certwatcher/metrics/metrics.go b/pkg/certwatcher/metrics/metrics.go index 05869eff03..f128abbcf0 100644 --- a/pkg/certwatcher/metrics/metrics.go +++ b/pkg/certwatcher/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" ) From 2085acc2a1dcf54243922ea26066ee07cd64d9ab Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Mon, 2 Dec 2024 08:13:37 -0800 Subject: [PATCH 12/21] add watch deprecated to certwatcher Signed-off-by: Vince Prignano --- pkg/certwatcher/certwatcher.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/certwatcher/certwatcher.go b/pkg/certwatcher/certwatcher.go index d295b29864..f629dd4e16 100644 --- a/pkg/certwatcher/certwatcher.go +++ b/pkg/certwatcher/certwatcher.go @@ -104,6 +104,13 @@ func (cw *CertWatcher) Start(ctx context.Context) error { } } +// Watch used to read events from the watcher's channel and reacts to changes, +// it has currently no function and it's left here for backward compatibility until a future release. +// +// Deprecated: fsnotify has been removed and Start() is now polling instead. +func (cw *CertWatcher) Watch() { +} + // updateCachedCertificate checks if the new certificate differs from the cache, // updates it and returns the result if it was updated or not func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool { From 3c5359d2a296c824f9443abf609ccc3013bc5712 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Mon, 30 Dec 2024 09:31:18 -0800 Subject: [PATCH 13/21] Add fsnotify watcher+polling Signed-off-by: Vince Prignano --- examples/scratch-env/go.mod | 1 + examples/scratch-env/go.sum | 2 + go.mod | 2 +- pkg/certwatcher/certwatcher.go | 88 +++++++++++++++++++++++++++++++--- 4 files changed, 85 insertions(+), 8 deletions(-) diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index 8cf3ce725b..dceb4d12aa 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -14,6 +14,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index a920487db5..89d30c15c1 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -13,6 +13,8 @@ github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8 github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= diff --git a/go.mod b/go.mod index a0a01baa74..3fd1aa9562 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/evanphx/json-patch/v5 v5.9.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/google/go-cmp v0.6.0 @@ -39,7 +40,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/pkg/certwatcher/certwatcher.go b/pkg/certwatcher/certwatcher.go index f629dd4e16..c323240982 100644 --- a/pkg/certwatcher/certwatcher.go +++ b/pkg/certwatcher/certwatcher.go @@ -20,10 +20,15 @@ import ( "bytes" "context" "crypto/tls" + "fmt" "os" "sync" "time" + "github.com/fsnotify/fsnotify" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" ) @@ -40,6 +45,7 @@ type CertWatcher struct { sync.RWMutex currentCert *tls.Certificate + watcher *fsnotify.Watcher interval time.Duration certPath string @@ -53,13 +59,25 @@ type CertWatcher struct { // New returns a new CertWatcher watching the given certificate and key. func New(certPath, keyPath string) (*CertWatcher, error) { + var err error + cw := &CertWatcher{ certPath: certPath, keyPath: keyPath, interval: defaultWatchInterval, } - return cw, cw.ReadCertificate() + // Initial read of certificate and key. + if err := cw.ReadCertificate(); err != nil { + return nil, err + } + + cw.watcher, err = fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + return cw, nil } // WithWatchInterval sets the watch interval and returns the CertWatcher pointer @@ -88,14 +106,35 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, // Start starts the watch on the certificate and key files. func (cw *CertWatcher) Start(ctx context.Context) error { + files := sets.New(cw.certPath, cw.keyPath) + + { + var watchErr error + if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + for _, f := range files.UnsortedList() { + if err := cw.watcher.Add(f); err != nil { + watchErr = err + return false, nil //nolint:nilerr // We want to keep trying. + } + // We've added the watch, remove it from the set. + files.Delete(f) + } + return true, nil + }); err != nil { + return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr})) + } + } + + go cw.Watch() + ticker := time.NewTicker(cw.interval) defer ticker.Stop() - log.Info("Starting certificate watcher") + log.Info("Starting certificate poll+watcher", "interval", cw.interval) for { select { case <-ctx.Done(): - return nil + return cw.watcher.Close() case <-ticker.C: if err := cw.ReadCertificate(); err != nil { log.Error(err, "failed read certificate") @@ -104,11 +143,26 @@ func (cw *CertWatcher) Start(ctx context.Context) error { } } -// Watch used to read events from the watcher's channel and reacts to changes, -// it has currently no function and it's left here for backward compatibility until a future release. -// -// Deprecated: fsnotify has been removed and Start() is now polling instead. +// Watch reads events from the watcher's channel and reacts to changes. func (cw *CertWatcher) Watch() { + for { + select { + case event, ok := <-cw.watcher.Events: + // Channel is closed. + if !ok { + return + } + + cw.handleEvent(event) + case err, ok := <-cw.watcher.Errors: + // Channel is closed. + if !ok { + return + } + + log.Error(err, "certificate watch error") + } + } } // updateCachedCertificate checks if the new certificate differs from the cache, @@ -166,3 +220,23 @@ func (cw *CertWatcher) ReadCertificate() error { } return nil } + +func (cw *CertWatcher) handleEvent(event fsnotify.Event) { + // Only care about events which may modify the contents of the file. + switch { + case event.Op.Has(fsnotify.Write): + case event.Op.Has(fsnotify.Create): + case event.Op.Has(fsnotify.Chmod), event.Op.Has(fsnotify.Remove): + // If the file was removed or renamed, re-add the watch to the previous name + if err := cw.watcher.Add(event.Name); err != nil { + log.Error(err, "error re-watching file") + } + default: + return + } + + log.V(1).Info("certificate event", "event", event) + if err := cw.ReadCertificate(); err != nil { + log.Error(err, "error re-reading certificate") + } +} From 3218a864973c7cea16227c25744898e744bca92c Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Mon, 30 Dec 2024 19:29:01 +0100 Subject: [PATCH 14/21] adjust tests --- pkg/certwatcher/certwatcher_test.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/certwatcher/certwatcher_test.go b/pkg/certwatcher/certwatcher_test.go index fa7e09adae..a47b88ed99 100644 --- a/pkg/certwatcher/certwatcher_test.go +++ b/pkg/certwatcher/certwatcher_test.go @@ -77,12 +77,12 @@ var _ = Describe("CertWatcher", func() { Expect(err).ToNot(HaveOccurred()) }) - startWatcher := func() (done <-chan struct{}) { + startWatcher := func(interval time.Duration) (done <-chan struct{}) { doneCh := make(chan struct{}) go func() { defer GinkgoRecover() defer close(doneCh) - Expect(watcher.WithWatchInterval(time.Second).Start(ctx)).To(Succeed()) + Expect(watcher.WithWatchInterval(interval).Start(ctx)).To(Succeed()) }() // wait till we read first cert Eventually(func() error { @@ -93,14 +93,16 @@ var _ = Describe("CertWatcher", func() { } It("should read the initial cert/key", func() { - doneCh := startWatcher() + // This test verifies the initial read succeeded. So interval doesn't matter. + doneCh := startWatcher(10 * time.Second) ctxCancel() Eventually(doneCh, "4s").Should(BeClosed()) }) It("should reload currentCert when changed", func() { - doneCh := startWatcher() + // This test verifies fsnotify detects the cert change. So interval doesn't matter. + doneCh := startWatcher(10 * time.Second) called := atomic.Int64{} watcher.RegisterCallback(func(crt tls.Certificate) { called.Add(1) @@ -124,7 +126,8 @@ var _ = Describe("CertWatcher", func() { }) It("should reload currentCert when changed with rename", func() { - doneCh := startWatcher() + // This test verifies fsnotify detects the cert change. So interval doesn't matter. + doneCh := startWatcher(10 * time.Second) called := atomic.Int64{} watcher.RegisterCallback(func(crt tls.Certificate) { called.Add(1) @@ -154,7 +157,8 @@ var _ = Describe("CertWatcher", func() { }) It("should reload currentCert after move out", func() { - doneCh := startWatcher() + // This test verifies poll works, so we'll use 1s as interval (fsnotify doesn't detect this change). + doneCh := startWatcher(1 * time.Second) called := atomic.Int64{} watcher.RegisterCallback(func(crt tls.Certificate) { called.Add(1) @@ -190,7 +194,8 @@ var _ = Describe("CertWatcher", func() { }) It("should get updated on successful certificate read", func() { - doneCh := startWatcher() + // This test verifies fsnotify, so interval doesn't matter. + doneCh := startWatcher(10 * time.Second) Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) @@ -205,7 +210,8 @@ var _ = Describe("CertWatcher", func() { }) It("should get updated on read certificate errors", func() { - doneCh := startWatcher() + // This test works with fsnotify, so interval doesn't matter. + doneCh := startWatcher(10 * time.Second) Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) From 2ae6e55223aaa7664236f0a9dec7102ba6966291 Mon Sep 17 00:00:00 2001 From: Christian Schlotter Date: Mon, 20 Jan 2025 11:58:53 +0100 Subject: [PATCH 15/21] cache: clone maps to prevent data race when concurrently creating caches using the same options --- pkg/cache/cache.go | 5 ++++- pkg/cache/defaulting_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 406380d329..9c7bf23255 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -467,6 +467,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { } } + opts.ByObject = maps.Clone(opts.ByObject) + opts.DefaultNamespaces = maps.Clone(opts.DefaultNamespaces) for obj, byObject := range opts.ByObject { isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper) if err != nil { @@ -478,6 +480,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if isNamespaced && byObject.Namespaces == nil { byObject.Namespaces = maps.Clone(opts.DefaultNamespaces) + } else { + byObject.Namespaces = maps.Clone(byObject.Namespaces) } // Default the namespace-level configs first, because they need to use the undefaulted type-level config @@ -485,7 +489,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { for namespace, config := range byObject.Namespaces { // 1. Default from the undefaulted type-level config config = defaultConfig(config, byObjectToConfig(byObject)) - // 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but // might not have an entry for the current namespace. if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace { diff --git a/pkg/cache/defaulting_test.go b/pkg/cache/defaulting_test.go index 8e3033eb47..d9d0dcceb3 100644 --- a/pkg/cache/defaulting_test.go +++ b/pkg/cache/defaulting_test.go @@ -18,6 +18,7 @@ package cache import ( "reflect" + "sync" "testing" "time" @@ -432,6 +433,34 @@ func TestDefaultOpts(t *testing.T) { } } +func TestDefaultOptsRace(t *testing.T) { + opts := Options{ + Mapper: &fakeRESTMapper{}, + ByObject: map[client.Object]ByObject{ + &corev1.Pod{}: { + Label: labels.SelectorFromSet(map[string]string{"from": "pod"}), + Namespaces: map[string]Config{"default": { + LabelSelector: labels.SelectorFromSet(map[string]string{"from": "pod"}), + }}, + }, + }, + DefaultNamespaces: map[string]Config{"default": {}}, + } + + // Start go routines which re-use the above options struct. + wg := sync.WaitGroup{} + for range 2 { + wg.Add(1) + go func() { + _, _ = defaultOpts(&rest.Config{}, opts) + wg.Done() + }() + } + + // Wait for the go routines to finish. + wg.Wait() +} + type fakeRESTMapper struct { meta.RESTMapper } From 80176664aa7c3301ea985b65b871b341cb9e2c4f Mon Sep 17 00:00:00 2001 From: Tarek Sharafi Date: Thu, 23 Jan 2025 01:04:37 +0200 Subject: [PATCH 16/21] =?UTF-8?q?=F0=9F=90=9Bfix(controller):=20support=20?= =?UTF-8?q?WaitForSync=20in=20custom=20TypedSyncingSource=20(#3084)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * πŸ›fix(controller): use generic WaitForSync There is already support for defining `TypedSyncingSource` but the original code still checks for the original `SyncingSource` before callign `WaitForSync(ctx)` which does not work for custom typed controller. this fix should be backported to v0.19 * test --- pkg/internal/controller/controller.go | 2 +- .../controller/controller_suite_test.go | 10 +-- pkg/internal/controller/controller_test.go | 76 +++++++++++++++++++ 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index dfe407f3b8..1f7752ba6e 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -183,7 +183,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { c.LogConstructor(nil).Info("Starting Controller") for _, watch := range c.startWatches { - syncingSource, ok := watch.(source.SyncingSource) + syncingSource, ok := watch.(source.TypedSyncingSource[request]) if !ok { continue } diff --git a/pkg/internal/controller/controller_suite_test.go b/pkg/internal/controller/controller_suite_test.go index 3143d3dd74..dfa691aeee 100644 --- a/pkg/internal/controller/controller_suite_test.go +++ b/pkg/internal/controller/controller_suite_test.go @@ -42,12 +42,12 @@ var _ = BeforeSuite(func() { testenv = &envtest.Environment{} - var err error - cfg, err = testenv.Start() - Expect(err).NotTo(HaveOccurred()) + // var err error + // cfg, err = testenv.Start() + // Expect(err).NotTo(HaveOccurred()) - clientset, err = kubernetes.NewForConfig(cfg) - Expect(err).NotTo(HaveOccurred()) + // clientset, err = kubernetes.NewForConfig(cfg) + // Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() { diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 638d21810e..6376a0fcdf 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -46,6 +46,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) +type TestRequest struct { + Key string +} + var _ = Describe("controller", func() { var fakeReconcile *fakeReconciler var ctrl *Controller[reconcile.Request] @@ -323,6 +327,41 @@ var _ = Describe("controller", func() { Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times")) }) + It("should check for correct TypedSyncingSource if custom types are used", func() { + queue := &controllertest.TypedQueue[TestRequest]{ + TypedInterface: workqueue.NewTyped[TestRequest](), + } + ctrl := &Controller[TestRequest]{ + NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] { + return queue + }, + LogConstructor: func(*TestRequest) logr.Logger { + return log.RuntimeLog.WithName("controller").WithName("test") + }, + } + ctrl.CacheSyncTimeout = time.Second + src := &bisignallingSource[TestRequest]{ + startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]), + startDone: make(chan error, 1), + waitCall: make(chan struct{}), + waitDone: make(chan error, 1), + } + ctrl.startWatches = []source.TypedSource[TestRequest]{src} + ctrl.Name = "foo" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + startCh := make(chan error) + go func() { + defer GinkgoRecover() + startCh <- ctrl.Start(ctx) + }() + Eventually(src.startCall).Should(Receive(Equal(queue))) + src.startDone <- nil + Eventually(src.waitCall).Should(BeClosed()) + src.waitDone <- nil + cancel() + Eventually(startCh).Should(Receive(Succeed())) + }) }) Describe("Processing queue items from a Controller", func() { @@ -875,3 +914,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte <-ctx.Done() return nil, errors.New("GetInformer timed out") } + +type bisignallingSource[T comparable] struct { + // receives the queue that is passed to Start + startCall chan workqueue.TypedRateLimitingInterface[T] + // passes an error to return from Start + startDone chan error + // closed when WaitForSync is called + waitCall chan struct{} + // passes an error to return from WaitForSync + waitDone chan error +} + +var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil) + +func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error { + select { + case t.startCall <- q: + case <-ctx.Done(): + return ctx.Err() + } + select { + case err := <-t.startDone: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error { + close(t.waitCall) + select { + case err := <-t.waitDone: + return err + case <-ctx.Done(): + return ctx.Err() + } +} From b2c21718ff16807bb3f00e4a32f7e9604b368507 Mon Sep 17 00:00:00 2001 From: Tarek Sharafi Date: Thu, 23 Jan 2025 13:14:40 +0200 Subject: [PATCH 17/21] Update controller_suite_test.go --- pkg/internal/controller/controller_suite_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/internal/controller/controller_suite_test.go b/pkg/internal/controller/controller_suite_test.go index dfa691aeee..3143d3dd74 100644 --- a/pkg/internal/controller/controller_suite_test.go +++ b/pkg/internal/controller/controller_suite_test.go @@ -42,12 +42,12 @@ var _ = BeforeSuite(func() { testenv = &envtest.Environment{} - // var err error - // cfg, err = testenv.Start() - // Expect(err).NotTo(HaveOccurred()) + var err error + cfg, err = testenv.Start() + Expect(err).NotTo(HaveOccurred()) - // clientset, err = kubernetes.NewForConfig(cfg) - // Expect(err).NotTo(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() { From cd644c0ad54f254002da840885ec47a04aca2d3e Mon Sep 17 00:00:00 2001 From: k8s-infra-cherrypick-robot <90416843+k8s-infra-cherrypick-robot@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:48:17 -0800 Subject: [PATCH 18/21] =?UTF-8?q?[release-0.19]=20=E2=9C=A8=20Expose=20all?= =?UTF-8?q?=20Go=20runtime=20metrics=20(#3101)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add GoCollector and rocessCollector Signed-off-by: dongjiang * move to internal/controller/metrics Signed-off-by: dongjiang * default add all go runtime metrics Signed-off-by: dongjiang --------- Signed-off-by: dongjiang Co-authored-by: dongjiang --- pkg/internal/controller/metrics/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/internal/controller/metrics/metrics.go b/pkg/internal/controller/metrics/metrics.go index fbf15669d5..6d562efb93 100644 --- a/pkg/internal/controller/metrics/metrics.go +++ b/pkg/internal/controller/metrics/metrics.go @@ -88,7 +88,7 @@ func init() { ActiveWorkers, // expose process metrics like CPU, Memory, file descriptor usage etc. collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - // expose Go runtime metrics like GC stats, memory stats etc. - collectors.NewGoCollector(), + // expose all Go runtime metrics like GC stats, memory stats etc. + collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll)), ) } From a2a9f545a01901d180d41c24200dd030d0256c61 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Thu, 20 Feb 2025 08:01:34 -0500 Subject: [PATCH 19/21] fix: cache should list out of global cache when present and necessary When the cache options are configured with DefaultNamespaces which include an entry with `cache.AllNamespaces`, listing from the cache should fallback to the global cache if there are no namespace-specific caches that match the namespace from the list options. Signed-off-by: Joe Lanford --- pkg/cache/cache_test.go | 12 ++++++------ pkg/cache/multi_namespace_cache.go | 3 +++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 7a21c87c37..9513c2c175 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -130,16 +130,16 @@ var _ = Describe("Informer Cache with ReaderFailOnMissingInformer", func() { var _ = Describe("Multi-Namespace Informer Cache", func() { CacheTest(cache.New, cache.Options{ DefaultNamespaces: map[string]cache.Config{ - testNamespaceOne: {}, - testNamespaceTwo: {}, - "default": {}, + cache.AllNamespaces: {FieldSelector: fields.OneTermEqualSelector("metadata.namespace", testNamespaceOne)}, + testNamespaceTwo: {}, + "default": {}, }, }) NonBlockingGetTest(cache.New, cache.Options{ DefaultNamespaces: map[string]cache.Config{ - testNamespaceOne: {}, - testNamespaceTwo: {}, - "default": {}, + cache.AllNamespaces: {FieldSelector: fields.OneTermEqualSelector("metadata.namespace", testNamespaceOne)}, + testNamespaceTwo: {}, + "default": {}, }, }) }) diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index da69f40f65..aeeeb66937 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -262,6 +262,9 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, if listOpts.Namespace != corev1.NamespaceAll { cache, ok := c.namespaceToCache[listOpts.Namespace] if !ok { + if global, hasGlobal := c.namespaceToCache[AllNamespaces]; hasGlobal { + return global.List(ctx, list, opts...) + } return fmt.Errorf("unable to list: %v because of unknown namespace for the cache", listOpts.Namespace) } return cache.List(ctx, list, opts...) From d2145c0a289e73a16f90c610c3bc2f9fbe231cf7 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Wed, 5 Mar 2025 21:05:54 -0500 Subject: [PATCH 20/21] bug: Fakeclient: Fix dataraces when writing to the scheme We have a scheme write lock but plenty of other codeptaths that read from the scheme and that don't do looking, resulting in dataraces if the two happen in parallel. This change introduces a simple RW lock and makes the fakeclient acquire read locking for all its operations except when needing the write lock. This isn't particularly smart, but given that we only have one codepath that writes to the scheme, it seems good enough. --- pkg/client/fake/client.go | 24 ++++++++-- pkg/client/fake/client_test.go | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 54f5b5d258..e881815233 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -74,8 +74,8 @@ type fakeClient struct { trackerWriteLock sync.Mutex tracker versionedTracker - schemeWriteLock sync.Mutex - scheme *runtime.Scheme + schemeLock sync.RWMutex + scheme *runtime.Scheme restMapper meta.RESTMapper withStatusSubresource sets.Set[schema.GroupVersionKind] @@ -509,6 +509,8 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt } func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvr, err := getGVRFromObject(obj, c.scheme) if err != nil { return err @@ -558,6 +560,8 @@ func (c *fakeClient) Watch(ctx context.Context, list client.ObjectList, opts ... } func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err @@ -570,9 +574,11 @@ func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...cl if _, isUnstructuredList := obj.(runtime.Unstructured); isUnstructuredList && !c.scheme.Recognizes(gvk) { // We need to register the ListKind with UnstructuredList: // https://github.com/kubernetes/kubernetes/blob/7b2776b89fb1be28d4e9203bdeec079be903c103/staging/src/k8s.io/client-go/dynamic/fake/simple.go#L44-L51 - c.schemeWriteLock.Lock() + c.schemeLock.RUnlock() + c.schemeLock.Lock() c.scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(gvk.Kind+"List"), &unstructured.UnstructuredList{}) - c.schemeWriteLock.Unlock() + c.schemeLock.Unlock() + c.schemeLock.RLock() } listOpts := client.ListOptions{} @@ -712,6 +718,8 @@ func (c *fakeClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { } func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() createOptions := &client.CreateOptions{} createOptions.ApplyOptions(opts) @@ -748,6 +756,8 @@ func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvr, err := getGVRFromObject(obj, c.scheme) if err != nil { return err @@ -793,6 +803,8 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err @@ -842,6 +854,8 @@ func (c *fakeClient) Update(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.UpdateOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() updateOptions := &client.UpdateOptions{} updateOptions.ApplyOptions(opts) @@ -870,6 +884,8 @@ func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client. } func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() patchOptions := &client.PatchOptions{} patchOptions.ApplyOptions(opts) diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index 2f5392fda7..06e2c142c7 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -2409,6 +2409,93 @@ var _ = Describe("Fake client", func() { Expect(cl.SubResource(subResourceScale).Update(context.Background(), obj, client.WithSubResourceBody(scale)).Error()).To(Equal(expectedErr)) }) + It("is threadsafe", func() { + cl := NewClientBuilder().Build() + + u := func() *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetAPIVersion("custom/v1") + u.SetKind("Version") + u.SetName("foo") + return u + } + + uList := func() *unstructured.UnstructuredList { + u := &unstructured.UnstructuredList{} + u.SetAPIVersion("custom/v1") + u.SetKind("Version") + + return u + } + + meta := func() *metav1.PartialObjectMetadata { + return &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "custom/v1", + Kind: "Version", + }, + } + } + metaList := func() *metav1.PartialObjectMetadataList { + return &metav1.PartialObjectMetadataList{ + TypeMeta: metav1.TypeMeta{ + + APIVersion: "custom/v1", + Kind: "Version", + }, + } + } + + pod := func() *corev1.Pod { + return &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }} + } + + ctx := context.Background() + ops := []func(){ + func() { _ = cl.Create(ctx, u()) }, + func() { _ = cl.Get(ctx, client.ObjectKeyFromObject(u()), u()) }, + func() { _ = cl.Update(ctx, u()) }, + func() { _ = cl.Patch(ctx, u(), client.RawPatch(types.StrategicMergePatchType, []byte("foo"))) }, + func() { _ = cl.Delete(ctx, u()) }, + func() { _ = cl.DeleteAllOf(ctx, u(), client.HasLabels{"foo"}) }, + func() { _ = cl.List(ctx, uList()) }, + + func() { _ = cl.Create(ctx, meta()) }, + func() { _ = cl.Get(ctx, client.ObjectKeyFromObject(meta()), meta()) }, + func() { _ = cl.Update(ctx, meta()) }, + func() { _ = cl.Patch(ctx, meta(), client.RawPatch(types.StrategicMergePatchType, []byte("foo"))) }, + func() { _ = cl.Delete(ctx, meta()) }, + func() { _ = cl.DeleteAllOf(ctx, meta(), client.HasLabels{"foo"}) }, + func() { _ = cl.List(ctx, metaList()) }, + + func() { _ = cl.Create(ctx, pod()) }, + func() { _ = cl.Get(ctx, client.ObjectKeyFromObject(pod()), pod()) }, + func() { _ = cl.Update(ctx, pod()) }, + func() { _ = cl.Patch(ctx, pod(), client.RawPatch(types.StrategicMergePatchType, []byte("foo"))) }, + func() { _ = cl.Delete(ctx, pod()) }, + func() { _ = cl.DeleteAllOf(ctx, pod(), client.HasLabels{"foo"}) }, + func() { _ = cl.List(ctx, &corev1.PodList{}) }, + } + + wg := sync.WaitGroup{} + wg.Add(len(ops)) + for _, op := range ops { + go func() { + defer wg.Done() + op() + }() + } + + wg.Wait() + }) + scalableObjs := []client.Object{ &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -2497,6 +2584,7 @@ var _ = Describe("Fake client", func() { scaleExpected.ResourceVersion = scaleActual.ResourceVersion Expect(cmp.Diff(scaleExpected, scaleActual)).To(BeEmpty()) }) + } }) From 97bb1ffeede43378908af5cd38c03d36ac8f977c Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 7 Mar 2025 15:19:49 -0500 Subject: [PATCH 21/21] =?UTF-8?q?Revert=20"[release-0.20]=20=E2=9C=A8=20Ex?= =?UTF-8?q?pose=20all=20Go=20runtime=20metrics=20(#3100)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit fc485839d5c10112642325e7e6df25084553c0e4. This change breaks some users. --- pkg/internal/controller/metrics/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/internal/controller/metrics/metrics.go b/pkg/internal/controller/metrics/metrics.go index 6d562efb93..fbf15669d5 100644 --- a/pkg/internal/controller/metrics/metrics.go +++ b/pkg/internal/controller/metrics/metrics.go @@ -88,7 +88,7 @@ func init() { ActiveWorkers, // expose process metrics like CPU, Memory, file descriptor usage etc. collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - // expose all Go runtime metrics like GC stats, memory stats etc. - collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll)), + // expose Go runtime metrics like GC stats, memory stats etc. + collectors.NewGoCollector(), ) }