diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 67dabfd733..7f408e8192 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -14,6 +14,8 @@ jobs: name: Upload binaries to release runs-on: ubuntu-latest steps: + - name: Set env + run: echo "RELEASE_TAG=${GITHUB_REF:10}" >> $GITHUB_ENV - name: Check out code uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # tag=v4.2.2 - name: Calculate go version diff --git a/Makefile b/Makefile index 0406fc8a60..2361df981d 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ SHELL:=/usr/bin/env bash # # Go. # -GO_VERSION ?= 1.23.2 +GO_VERSION ?= 1.23.0 # Use GOPROXY environment variable if set GOPROXY := $(shell go env GOPROXY) @@ -88,7 +88,7 @@ GO_APIDIFF_PKG := github.com/joelanford/go-apidiff $(GO_APIDIFF): # Build go-apidiff from tools folder. GOBIN=$(TOOLS_BIN_DIR) $(GO_INSTALL) $(GO_APIDIFF_PKG) $(GO_APIDIFF_BIN) $(GO_APIDIFF_VER) -CONTROLLER_GEN_VER := v0.14.0 +CONTROLLER_GEN_VER := v0.17.1 CONTROLLER_GEN_BIN := controller-gen CONTROLLER_GEN := $(abspath $(TOOLS_BIN_DIR)/$(CONTROLLER_GEN_BIN)-$(CONTROLLER_GEN_VER)) CONTROLLER_GEN_PKG := sigs.k8s.io/controller-tools/cmd/controller-gen @@ -174,7 +174,7 @@ release-binary: $(RELEASE_DIR) -v "$$(pwd):/workspace$(DOCKER_VOL_OPTS)" \ -w /workspace/tools/setup-envtest \ golang:$(GO_VERSION) \ - go build -a -trimpath -ldflags "-extldflags '-static'" \ + go build -a -trimpath -ldflags "-X 'sigs.k8s.io/controller-runtime/tools/setup-envtest/version.version=$(RELEASE_TAG)' -extldflags '-static'" \ -o ./out/$(RELEASE_BINARY) ./ ## -------------------------------------- diff --git a/examples/priorityqueue/main.go b/examples/priorityqueue/main.go index 2b09432f22..8dacdcc9a3 100644 --- a/examples/priorityqueue/main.go +++ b/examples/priorityqueue/main.go @@ -22,6 +22,7 @@ import ( "os" "time" + "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -45,7 +46,9 @@ func main() { } func run() error { - log.SetLogger(zap.New()) + log.SetLogger(zap.New(func(o *zap.Options) { + o.Level = zapcore.Level(-5) + })) // Setup a Manager mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{ diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index bd7fc50656..8fc685c369 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -3,7 +3,7 @@ module sigs.k8s.io/controller-runtime/examples/scratch-env go 1.23.0 require ( - github.com/spf13/pflag v1.0.5 + github.com/spf13/pflag v1.0.6 go.uber.org/zap v1.27.0 sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000 ) @@ -13,7 +13,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/evanphx/json-patch/v5 v5.9.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -53,13 +53,13 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.32.0 // indirect - k8s.io/apiextensions-apiserver v0.32.0 // indirect - k8s.io/apimachinery v0.32.0 // indirect - k8s.io/client-go v0.32.0 // indirect + k8s.io/api v0.32.1 // indirect + k8s.io/apiextensions-apiserver v0.32.1 // indirect + k8s.io/apimachinery v0.32.1 // indirect + k8s.io/client-go v0.32.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect - k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 63a151e33f..59b01aaac3 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -11,8 +11,8 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= -github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= -github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= +github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= +github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= @@ -71,10 +71,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= -github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= -github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= +github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -90,8 +90,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -166,20 +166,20 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.32.0 h1:OL9JpbvAU5ny9ga2fb24X8H6xQlVp+aJMFlgtQjR9CE= -k8s.io/api v0.32.0/go.mod h1:4LEwHZEf6Q/cG96F3dqR965sYOfmPM7rq81BLgsE0p0= -k8s.io/apiextensions-apiserver v0.32.0 h1:S0Xlqt51qzzqjKPxfgX1xh4HBZE+p8KKBq+k2SWNOE0= -k8s.io/apiextensions-apiserver v0.32.0/go.mod h1:86hblMvN5yxMvZrZFX2OhIHAuFIMJIZ19bTvzkP+Fmw= -k8s.io/apimachinery v0.32.0 h1:cFSE7N3rmEEtv4ei5X6DaJPHHX0C+upp+v5lVPiEwpg= -k8s.io/apimachinery v0.32.0/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= -k8s.io/client-go v0.32.0 h1:DimtMcnN/JIKZcrSrstiwvvZvLjG0aSxy8PxN8IChp8= -k8s.io/client-go v0.32.0/go.mod h1:boDWvdM1Drk4NJj/VddSLnx59X3OPgwrOo0vGbtq9+8= +k8s.io/api v0.32.1 h1:f562zw9cy+GvXzXf0CKlVQ7yHJVYzLfL6JAS4kOAaOc= +k8s.io/api v0.32.1/go.mod h1:/Yi/BqkuueW1BgpoePYBRdDYfjPF5sgTr5+YqDZra5k= +k8s.io/apiextensions-apiserver v0.32.1 h1:hjkALhRUeCariC8DiVmb5jj0VjIc1N0DREP32+6UXZw= +k8s.io/apiextensions-apiserver v0.32.1/go.mod h1:sxWIGuGiYov7Io1fAS2X06NjMIk5CbRHc2StSmbaQto= +k8s.io/apimachinery v0.32.1 h1:683ENpaCBjma4CYqsmZyhEzrGz6cjn1MY/X2jB2hkZs= +k8s.io/apimachinery v0.32.1/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= +k8s.io/client-go v0.32.1 h1:otM0AxdhdBIaQh7l1Q0jQpmo7WOFIk5FFa4bg6YMdUU= +k8s.io/client-go v0.32.1/go.mod h1:aTTKZY7MdxUaJ/KiUs8D+GssR9zJZi77ZqtzcGXIiDg= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4= -k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0= -k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro= +k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= diff --git a/go.mod b/go.mod index ae141ccb72..126c195aba 100644 --- a/go.mod +++ b/go.mod @@ -3,30 +3,29 @@ module sigs.k8s.io/controller-runtime go 1.23.0 require ( - github.com/evanphx/json-patch/v5 v5.9.0 + github.com/evanphx/json-patch/v5 v5.9.11 github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/google/btree v1.1.3 github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.0 - github.com/onsi/ginkgo/v2 v2.21.0 - github.com/onsi/gomega v1.35.1 + github.com/onsi/ginkgo/v2 v2.22.0 + github.com/onsi/gomega v1.36.1 github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 golang.org/x/sync v0.8.0 golang.org/x/sys v0.26.0 gomodules.xyz/jsonpatch/v2 v2.4.0 gopkg.in/evanphx/json-patch.v4 v4.12.0 // Using v4 to match upstream - k8s.io/api v0.32.0 - k8s.io/apiextensions-apiserver v0.32.0 - k8s.io/apimachinery v0.32.0 - k8s.io/apiserver v0.32.0 - k8s.io/client-go v0.32.0 + k8s.io/api v0.32.1 + k8s.io/apiextensions-apiserver v0.32.1 + k8s.io/apimachinery v0.32.1 + k8s.io/apiserver v0.32.1 + k8s.io/client-go v0.32.1 k8s.io/klog/v2 v2.130.1 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 sigs.k8s.io/yaml v1.4.0 @@ -79,6 +78,7 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/term v0.25.0 // indirect @@ -91,7 +91,7 @@ require ( google.golang.org/protobuf v1.35.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/component-base v0.32.0 // indirect + k8s.io/component-base v0.32.1 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect diff --git a/go.sum b/go.sum index bc183cde97..0bd9ded5a2 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= -github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= -github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= +github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= +github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -93,10 +93,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= -github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= -github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg= +github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= +github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -219,18 +219,18 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.32.0 h1:OL9JpbvAU5ny9ga2fb24X8H6xQlVp+aJMFlgtQjR9CE= -k8s.io/api v0.32.0/go.mod h1:4LEwHZEf6Q/cG96F3dqR965sYOfmPM7rq81BLgsE0p0= -k8s.io/apiextensions-apiserver v0.32.0 h1:S0Xlqt51qzzqjKPxfgX1xh4HBZE+p8KKBq+k2SWNOE0= -k8s.io/apiextensions-apiserver v0.32.0/go.mod h1:86hblMvN5yxMvZrZFX2OhIHAuFIMJIZ19bTvzkP+Fmw= -k8s.io/apimachinery v0.32.0 h1:cFSE7N3rmEEtv4ei5X6DaJPHHX0C+upp+v5lVPiEwpg= -k8s.io/apimachinery v0.32.0/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= -k8s.io/apiserver v0.32.0 h1:VJ89ZvQZ8p1sLeiWdRJpRD6oLozNZD2+qVSLi+ft5Qs= -k8s.io/apiserver v0.32.0/go.mod h1:HFh+dM1/BE/Hm4bS4nTXHVfN6Z6tFIZPi649n83b4Ag= -k8s.io/client-go v0.32.0 h1:DimtMcnN/JIKZcrSrstiwvvZvLjG0aSxy8PxN8IChp8= -k8s.io/client-go v0.32.0/go.mod h1:boDWvdM1Drk4NJj/VddSLnx59X3OPgwrOo0vGbtq9+8= -k8s.io/component-base v0.32.0 h1:d6cWHZkCiiep41ObYQS6IcgzOUQUNpywm39KVYaUqzU= -k8s.io/component-base v0.32.0/go.mod h1:JLG2W5TUxUu5uDyKiH2R/7NnxJo1HlPoRIIbVLkK5eM= +k8s.io/api v0.32.1 h1:f562zw9cy+GvXzXf0CKlVQ7yHJVYzLfL6JAS4kOAaOc= +k8s.io/api v0.32.1/go.mod h1:/Yi/BqkuueW1BgpoePYBRdDYfjPF5sgTr5+YqDZra5k= +k8s.io/apiextensions-apiserver v0.32.1 h1:hjkALhRUeCariC8DiVmb5jj0VjIc1N0DREP32+6UXZw= +k8s.io/apiextensions-apiserver v0.32.1/go.mod h1:sxWIGuGiYov7Io1fAS2X06NjMIk5CbRHc2StSmbaQto= +k8s.io/apimachinery v0.32.1 h1:683ENpaCBjma4CYqsmZyhEzrGz6cjn1MY/X2jB2hkZs= +k8s.io/apimachinery v0.32.1/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= +k8s.io/apiserver v0.32.1 h1:oo0OozRos66WFq87Zc5tclUX2r0mymoVHRq8JmR7Aak= +k8s.io/apiserver v0.32.1/go.mod h1:UcB9tWjBY7aryeI5zAgzVJB/6k7E97bkr1RgqDz0jPw= +k8s.io/client-go v0.32.1 h1:otM0AxdhdBIaQh7l1Q0jQpmo7WOFIk5FFa4bg6YMdUU= +k8s.io/client-go v0.32.1/go.mod h1:aTTKZY7MdxUaJ/KiUs8D+GssR9zJZi77ZqtzcGXIiDg= +k8s.io/component-base v0.32.1 h1:/5IfJ0dHIKBWysGV0yKTFfacZ5yNV1sulPh3ilJjRZk= +k8s.io/component-base v0.32.1/go.mod h1:j1iMMHi/sqAHeG5z+O9BFNCF698a1u0186zkjMZQ28w= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 0760953e02..6d906f6e52 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches( ) *TypedBuilder[request] { input := WatchesInput[request]{ obj: object, - handler: handler.WithLowPriorityWhenUnchanged(eventHandler), + handler: eventHandler, } for _, opt := range opts { opt.ApplyToWatches(&input) @@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error { } var hdler handler.TypedEventHandler[client.Object, request] - reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}))) + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{})) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, blder.forInput.predicates...) src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...) @@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error { } var hdler handler.TypedEventHandler[client.Object, request] - reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner( + reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner( blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(), blder.forInput.object, opts..., - )))) + ))) allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index ecffe07988..8f14bfdbfc 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -469,6 +469,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { } } + opts.ByObject = maps.Clone(opts.ByObject) + opts.DefaultNamespaces = maps.Clone(opts.DefaultNamespaces) for obj, byObject := range opts.ByObject { isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper) if err != nil { @@ -480,6 +482,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if isNamespaced && byObject.Namespaces == nil { byObject.Namespaces = maps.Clone(opts.DefaultNamespaces) + } else { + byObject.Namespaces = maps.Clone(byObject.Namespaces) } // Default the namespace-level configs first, because they need to use the undefaulted type-level config @@ -487,7 +491,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { for namespace, config := range byObject.Namespaces { // 1. Default from the undefaulted type-level config config = defaultConfig(config, byObjectToConfig(byObject)) - // 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but // might not have an entry for the current namespace. if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace { diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index df7d994ede..9049129cc4 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -130,16 +130,16 @@ var _ = Describe("Informer Cache with ReaderFailOnMissingInformer", func() { var _ = Describe("Multi-Namespace Informer Cache", func() { CacheTest(cache.New, cache.Options{ DefaultNamespaces: map[string]cache.Config{ - testNamespaceOne: {}, - testNamespaceTwo: {}, - "default": {}, + cache.AllNamespaces: {FieldSelector: fields.OneTermEqualSelector("metadata.namespace", testNamespaceOne)}, + testNamespaceTwo: {}, + "default": {}, }, }) NonBlockingGetTest(cache.New, cache.Options{ DefaultNamespaces: map[string]cache.Config{ - testNamespaceOne: {}, - testNamespaceTwo: {}, - "default": {}, + cache.AllNamespaces: {FieldSelector: fields.OneTermEqualSelector("metadata.namespace", testNamespaceOne)}, + testNamespaceTwo: {}, + "default": {}, }, }) }) diff --git a/pkg/cache/defaulting_test.go b/pkg/cache/defaulting_test.go index 8e3033eb47..d9d0dcceb3 100644 --- a/pkg/cache/defaulting_test.go +++ b/pkg/cache/defaulting_test.go @@ -18,6 +18,7 @@ package cache import ( "reflect" + "sync" "testing" "time" @@ -432,6 +433,34 @@ func TestDefaultOpts(t *testing.T) { } } +func TestDefaultOptsRace(t *testing.T) { + opts := Options{ + Mapper: &fakeRESTMapper{}, + ByObject: map[client.Object]ByObject{ + &corev1.Pod{}: { + Label: labels.SelectorFromSet(map[string]string{"from": "pod"}), + Namespaces: map[string]Config{"default": { + LabelSelector: labels.SelectorFromSet(map[string]string{"from": "pod"}), + }}, + }, + }, + DefaultNamespaces: map[string]Config{"default": {}}, + } + + // Start go routines which re-use the above options struct. + wg := sync.WaitGroup{} + for range 2 { + wg.Add(1) + go func() { + _, _ = defaultOpts(&rest.Config{}, opts) + wg.Done() + }() + } + + // Wait for the go routines to finish. + wg.Wait() +} + type fakeRESTMapper struct { meta.RESTMapper } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index da69f40f65..aeeeb66937 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -262,6 +262,9 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, if listOpts.Namespace != corev1.NamespaceAll { cache, ok := c.namespaceToCache[listOpts.Namespace] if !ok { + if global, hasGlobal := c.namespaceToCache[AllNamespaces]; hasGlobal { + return global.List(ctx, list, opts...) + } return fmt.Errorf("unable to list: %v because of unknown namespace for the cache", listOpts.Namespace) } return cache.List(ctx, list, opts...) diff --git a/pkg/client/apiutil/restmapper.go b/pkg/client/apiutil/restmapper.go index ad898617fa..7a7a0d1145 100644 --- a/pkg/client/apiutil/restmapper.go +++ b/pkg/client/apiutil/restmapper.go @@ -246,10 +246,18 @@ func (m *mapper) addGroupVersionResourcesToCacheAndReloadLocked(gvr map[schema.G } if !found { - groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{ + gv := metav1.GroupVersionForDiscovery{ GroupVersion: metav1.GroupVersion{Group: groupVersion.Group, Version: version}.String(), Version: version, - }) + } + + // Prepend if preferred version, else append. The upstream DiscoveryRestMappper assumes + // the first version is the preferred one: https://github.com/kubernetes/kubernetes/blob/ef54ac803b712137871c1a1f8d635d50e69ffa6c/staging/src/k8s.io/apimachinery/pkg/api/meta/restmapper.go#L458-L461 + if group, ok := m.apiGroups[groupVersion.Group]; ok && group.PreferredVersion.Version == version { + groupResources.Group.Versions = append([]metav1.GroupVersionForDiscovery{gv}, groupResources.Group.Versions...) + } else { + groupResources.Group.Versions = append(groupResources.Group.Versions, gv) + } } // Update data in the cache. @@ -284,14 +292,14 @@ func (m *mapper) findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName s } m.initialDiscoveryDone = true - if len(maybeResources) > 0 { - didAggregatedDiscovery = true - m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources) - } for i := range apiGroups.Groups { group := &apiGroups.Groups[i] m.apiGroups[group.Name] = group } + if len(maybeResources) > 0 { + didAggregatedDiscovery = true + m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources) + } // Looking in the cache again. // Don't return an error here if the API group is not present. diff --git a/pkg/client/apiutil/restmapper_test.go b/pkg/client/apiutil/restmapper_test.go index 00117d00a8..395daccce4 100644 --- a/pkg/client/apiutil/restmapper_test.go +++ b/pkg/client/apiutil/restmapper_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "sync" "testing" _ "github.com/onsi/ginkgo/v2" @@ -735,6 +736,33 @@ func TestLazyRestMapperProvider(t *testing.T) { g.Expect(err).NotTo(gmg.HaveOccurred()) g.Expect(mapping.Resource.Version).To(gmg.Equal("v1")) }) + + t.Run("Restmapper should consistently return the preferred version", func(t *testing.T) { + g := gmg.NewWithT(t) + + wg := sync.WaitGroup{} + wg.Add(50) + for i := 0; i < 50; i++ { + go func() { + defer wg.Done() + httpClient, err := rest.HTTPClientFor(restCfg) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + mapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient) + g.Expect(err).NotTo(gmg.HaveOccurred()) + + mapping, err := mapper.RESTMapping(schema.GroupKind{ + Group: "crew.example.com", + Kind: "Driver", + }) + g.Expect(err).NotTo(gmg.HaveOccurred()) + // APIServer seems to have a heuristic to prefer the higher + // version number. + g.Expect(mapping.GroupVersionKind.Version).To(gmg.Equal("v2")) + }() + } + wg.Wait() + }) }) } } diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 0c4300d548..793219c723 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -75,8 +75,8 @@ type fakeClient struct { trackerWriteLock sync.Mutex tracker versionedTracker - schemeWriteLock sync.Mutex - scheme *runtime.Scheme + schemeLock sync.RWMutex + scheme *runtime.Scheme restMapper meta.RESTMapper withStatusSubresource sets.Set[schema.GroupVersionKind] @@ -512,6 +512,8 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt } func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvr, err := getGVRFromObject(obj, c.scheme) if err != nil { return err @@ -561,6 +563,8 @@ func (c *fakeClient) Watch(ctx context.Context, list client.ObjectList, opts ... } func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err @@ -573,9 +577,11 @@ func (c *fakeClient) List(ctx context.Context, obj client.ObjectList, opts ...cl if _, isUnstructuredList := obj.(runtime.Unstructured); isUnstructuredList && !c.scheme.Recognizes(gvk) { // We need to register the ListKind with UnstructuredList: // https://github.com/kubernetes/kubernetes/blob/7b2776b89fb1be28d4e9203bdeec079be903c103/staging/src/k8s.io/client-go/dynamic/fake/simple.go#L44-L51 - c.schemeWriteLock.Lock() + c.schemeLock.RUnlock() + c.schemeLock.Lock() c.scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(gvk.Kind+"List"), &unstructured.UnstructuredList{}) - c.schemeWriteLock.Unlock() + c.schemeLock.Unlock() + c.schemeLock.RLock() } listOpts := client.ListOptions{} @@ -726,6 +732,8 @@ func (c *fakeClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { } func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() createOptions := &client.CreateOptions{} createOptions.ApplyOptions(opts) @@ -762,6 +770,8 @@ func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvr, err := getGVRFromObject(obj, c.scheme) if err != nil { return err @@ -807,6 +817,8 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err @@ -856,6 +868,8 @@ func (c *fakeClient) Update(ctx context.Context, obj client.Object, opts ...clie } func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.UpdateOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() updateOptions := &client.UpdateOptions{} updateOptions.ApplyOptions(opts) @@ -884,6 +898,8 @@ func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client. } func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + c.schemeLock.RLock() + defer c.schemeLock.RUnlock() patchOptions := &client.PatchOptions{} patchOptions.ApplyOptions(opts) diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index db768cca37..f6a493f54d 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -2516,6 +2516,93 @@ var _ = Describe("Fake client", func() { Expect(cl.SubResource(subResourceScale).Update(context.Background(), obj, client.WithSubResourceBody(scale)).Error()).To(Equal(expectedErr)) }) + It("is threadsafe", func() { + cl := NewClientBuilder().Build() + + u := func() *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetAPIVersion("custom/v1") + u.SetKind("Version") + u.SetName("foo") + return u + } + + uList := func() *unstructured.UnstructuredList { + u := &unstructured.UnstructuredList{} + u.SetAPIVersion("custom/v1") + u.SetKind("Version") + + return u + } + + meta := func() *metav1.PartialObjectMetadata { + return &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "custom/v1", + Kind: "Version", + }, + } + } + metaList := func() *metav1.PartialObjectMetadataList { + return &metav1.PartialObjectMetadataList{ + TypeMeta: metav1.TypeMeta{ + + APIVersion: "custom/v1", + Kind: "Version", + }, + } + } + + pod := func() *corev1.Pod { + return &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }} + } + + ctx := context.Background() + ops := []func(){ + func() { _ = cl.Create(ctx, u()) }, + func() { _ = cl.Get(ctx, client.ObjectKeyFromObject(u()), u()) }, + func() { _ = cl.Update(ctx, u()) }, + func() { _ = cl.Patch(ctx, u(), client.RawPatch(types.StrategicMergePatchType, []byte("foo"))) }, + func() { _ = cl.Delete(ctx, u()) }, + func() { _ = cl.DeleteAllOf(ctx, u(), client.HasLabels{"foo"}) }, + func() { _ = cl.List(ctx, uList()) }, + + func() { _ = cl.Create(ctx, meta()) }, + func() { _ = cl.Get(ctx, client.ObjectKeyFromObject(meta()), meta()) }, + func() { _ = cl.Update(ctx, meta()) }, + func() { _ = cl.Patch(ctx, meta(), client.RawPatch(types.StrategicMergePatchType, []byte("foo"))) }, + func() { _ = cl.Delete(ctx, meta()) }, + func() { _ = cl.DeleteAllOf(ctx, meta(), client.HasLabels{"foo"}) }, + func() { _ = cl.List(ctx, metaList()) }, + + func() { _ = cl.Create(ctx, pod()) }, + func() { _ = cl.Get(ctx, client.ObjectKeyFromObject(pod()), pod()) }, + func() { _ = cl.Update(ctx, pod()) }, + func() { _ = cl.Patch(ctx, pod(), client.RawPatch(types.StrategicMergePatchType, []byte("foo"))) }, + func() { _ = cl.Delete(ctx, pod()) }, + func() { _ = cl.DeleteAllOf(ctx, pod(), client.HasLabels{"foo"}) }, + func() { _ = cl.List(ctx, &corev1.PodList{}) }, + } + + wg := sync.WaitGroup{} + wg.Add(len(ops)) + for _, op := range ops { + go func() { + defer wg.Done() + op() + }() + } + + wg.Wait() + }) + scalableObjs := []client.Object{ &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -2604,6 +2691,7 @@ var _ = Describe("Fake client", func() { scaleExpected.ResourceVersion = scaleActual.ResourceVersion Expect(cmp.Diff(scaleExpected, scaleActual)).To(BeEmpty()) }) + } }) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b7d7286033..5c5b249ef5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -202,6 +202,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { + o.Log = mgr.GetLogger().WithValues("controller", controllerName) o.RateLimiter = rateLimiter }) } diff --git a/pkg/controller/name.go b/pkg/controller/name.go index 0e71a01c66..00ca655128 100644 --- a/pkg/controller/name.go +++ b/pkg/controller/name.go @@ -34,7 +34,7 @@ func checkName(name string) error { } if usedNames.Has(name) { - return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting to the same metric", name) + return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting the same metric. This validation can be disabled via the SkipNameValidation option", name) } usedNames.Insert(name) diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index f6a2697a65..36626646f4 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -85,11 +85,11 @@ func (m *defaultQueueMetrics[T]) get(item T) { return } + m.depth.Dec() + m.mapLock.Lock() defer m.mapLock.Unlock() - m.depth.Dec() - m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { m.latency.Observe(m.sinceInSeconds(startTime)) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 2b3a8904d7..ff5dea9021 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -5,11 +5,13 @@ import ( "sync/atomic" "time" + "github.com/go-logr/logr" "github.com/google/btree" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" ) @@ -36,6 +38,7 @@ type Opts[T comparable] struct { // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. RateLimiter workqueue.TypedRateLimiter[T] MetricProvider workqueue.MetricsProvider + Log logr.Logger } // Opt allows to configure a PriorityQueue. @@ -57,6 +60,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } pq := &priorityqueue[T]{ + log: opts.Log, items: map[T]*item[T]{}, queue: btree.NewG(32, less[T]), becameReady: sets.Set[T]{}, @@ -75,6 +79,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } go pq.spin() + go pq.logState() if _, ok := pq.metrics.(noMetrics[T]); !ok { go pq.updateUnfinishedWorkLoop() } @@ -83,6 +88,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } type priorityqueue[T comparable] struct { + log logr.Logger // lock has to be acquired for any access any of items, queue, addedCounter // or becameReady lock sync.Mutex @@ -127,28 +133,29 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { defer w.lock.Unlock() for _, key := range items { + after := o.After if o.RateLimited { - after := w.rateLimiter.When(key) - if o.After == 0 || after < o.After { - o.After = after + rlAfter := w.rateLimiter.When(key) + if after == 0 || rlAfter < after { + after = rlAfter } } var readyAt *time.Time - if o.After > 0 { - readyAt = ptr.To(w.now().Add(o.After)) + if after > 0 { + readyAt = ptr.To(w.now().Add(after)) w.metrics.retry() } if _, ok := w.items[key]; !ok { item := &item[T]{ - key: key, - addedCounter: w.addedCounter, - priority: o.Priority, - readyAt: readyAt, + Key: key, + AddedCounter: w.addedCounter, + Priority: o.Priority, + ReadyAt: readyAt, } w.items[key] = item w.queue.ReplaceOrInsert(item) - if item.readyAt == nil { + if item.ReadyAt == nil { w.metrics.add(key) } w.addedCounter++ @@ -158,15 +165,15 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // The b-tree de-duplicates based on ordering and any change here // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.priority { - item.priority = o.Priority + if o.Priority > item.Priority { + item.Priority = o.Priority } - if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { - if readyAt == nil { + if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { + if readyAt == nil && !w.becameReady.Has(key) { w.metrics.add(key) } - item.readyAt = readyAt + item.ReadyAt = readyAt } w.queue.ReplaceOrInsert(item) @@ -210,14 +217,14 @@ func (w *priorityqueue[T]) spin() { // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] w.queue.Ascend(func(item *item[T]) bool { - if item.readyAt != nil { - if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 { + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { nextReady = w.tick(readyAt) return false } - if !w.becameReady.Has(item.key) { - w.metrics.add(item.key) - w.becameReady.Insert(item.key) + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key) + w.becameReady.Insert(item.Key) } } @@ -228,16 +235,16 @@ func (w *priorityqueue[T]) spin() { } // Item is locked, we can not hand it out - if w.locked.Has(item.key) { + if w.locked.Has(item.Key) { return true } - w.metrics.get(item.key) - w.locked.Insert(item.key) + w.metrics.get(item.Key) + w.locked.Insert(item.Key) w.waiters.Add(-1) - delete(w.items, item.key) + delete(w.items, item.Key) toDelete = append(toDelete, item) - w.becameReady.Delete(item.key) + w.becameReady.Delete(item.Key) w.get <- *item return true @@ -268,7 +275,7 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.notifyItemOrWaiterAdded() item := <-w.get - return item.key, item.priority, w.shutdown.Load() + return item.Key, item.Priority, w.shutdown.Load() } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { @@ -316,7 +323,7 @@ func (w *priorityqueue[T]) Len() int { var result int w.queue.Ascend(func(item *item[T]) bool { - if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 { + if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 { result++ return true } @@ -326,36 +333,64 @@ func (w *priorityqueue[T]) Len() int { return result } +func (w *priorityqueue[T]) logState() { + t := time.Tick(10 * time.Second) + for { + select { + case <-w.done: + return + case <-t: + } + + // Log level may change at runtime, so keep the + // loop going even if a given level is currently + // not enabled. + if !w.log.V(5).Enabled() { + continue + } + w.lock.Lock() + items := make([]*item[T], 0, len(w.items)) + w.queue.Ascend(func(item *item[T]) bool { + items = append(items, item) + return true + }) + w.lock.Unlock() + + w.log.V(5).Info("workqueue_items", "items", items) + } +} + func less[T comparable](a, b *item[T]) bool { - if a.readyAt == nil && b.readyAt != nil { + if a.ReadyAt == nil && b.ReadyAt != nil { return true } - if b.readyAt == nil && a.readyAt != nil { + if b.ReadyAt == nil && a.ReadyAt != nil { return false } - if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) { - return a.readyAt.Before(*b.readyAt) + if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { + return a.ReadyAt.Before(*b.ReadyAt) } - if a.priority != b.priority { - return a.priority > b.priority + if a.Priority != b.Priority { + return a.Priority > b.Priority } - return a.addedCounter < b.addedCounter + return a.AddedCounter < b.AddedCounter } type item[T comparable] struct { - key T - addedCounter uint64 - priority int - readyAt *time.Time + Key T `json:"key"` + AddedCounter uint64 `json:"addedCounter"` + Priority int `json:"priority"` + ReadyAt *time.Time `json:"readyAt,omitempty"` } func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { - t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 - defer t.Stop() - for range t.C { - if w.shutdown.Load() { + t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 + for { + select { + case <-w.done: return + case <-t: } w.metrics.updateUnfinishedWork() } diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index e431c993fb..f54d3cc11c 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -11,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" ) var _ = Describe("Controllerworkqueue", func() { @@ -395,6 +396,114 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(1)) metrics.mu.Lock() Expect(metrics.depth["test"]).To(Equal(1)) + metrics.mu.Unlock() + + // Get the item to ensure the codepath in + // `spin` for the metrics is passed by so + // that this starts failing if it incorrectly + // calls `metrics.add` again. + item, _ := q.Get() + Expect(item).To(Equal("foo")) + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() + }) + + It("Updates metrics correctly for an item whose requeueAfter expired that gets added again without requeueAfter", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo") + time.Sleep(100 * time.Millisecond) + + Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(1)) + metrics.mu.Unlock() + + q.AddWithOpts(AddOpts{}, "foo") + Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(1)) + metrics.mu.Unlock() + + // Get the item to ensure the codepath in + // `spin` for the metrics is passed by so + // that this starts failing if it incorrectly + // calls `metrics.add` again. + item, _ := q.Get() + Expect(item).To(Equal("foo")) + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() + }) + + It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + + Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond))) + }() + <-done + return tick + } + + retrievedItem := make(chan struct{}) + retrievedSecondItem := make(chan struct{}) + + go func() { + defer GinkgoRecover() + first, _, _ := q.GetWithPriority() + Expect(first).To(Equal("foo")) + close(retrievedItem) + + second, _, _ := q.GetWithPriority() + Expect(second).To(Equal("bar")) + close(retrievedSecondItem) + }() + + // after 7 calls, the next When("bar") call will return 640ms. + for range 7 { + cwq.rateLimiter.When("bar") + } + q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar") + + Consistently(retrievedItem).ShouldNot(BeClosed()) + nowLock.Lock() + now = now.Add(5 * time.Millisecond) + nowLock.Unlock() + tick <- now + Eventually(retrievedItem).Should(BeClosed()) + + Consistently(retrievedSecondItem).ShouldNot(BeClosed()) + nowLock.Lock() + now = now.Add(635 * time.Millisecond) + nowLock.Unlock() + tick <- now + Eventually(retrievedSecondItem).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.retries["test"]).To(Equal(2)) }) }) diff --git a/pkg/envtest/crd.go b/pkg/envtest/crd.go index 49f6b149be..8ed2224cfe 100644 --- a/pkg/envtest/crd.go +++ b/pkg/envtest/crd.go @@ -94,7 +94,7 @@ func InstallCRDs(config *rest.Config, options CRDInstallOptions) ([]*apiextensio defaultCRDOptions(&options) // Read the CRD yamls into options.CRDs - if err := readCRDFiles(&options); err != nil { + if err := ReadCRDFiles(&options); err != nil { return nil, fmt.Errorf("unable to read CRD files: %w", err) } @@ -115,8 +115,8 @@ func InstallCRDs(config *rest.Config, options CRDInstallOptions) ([]*apiextensio return options.CRDs, nil } -// readCRDFiles reads the directories of CRDs in options.Paths and adds the CRD structs to options.CRDs. -func readCRDFiles(options *CRDInstallOptions) error { +// ReadCRDFiles reads the directories of CRDs in options.Paths and adds the CRD structs to options.CRDs. +func ReadCRDFiles(options *CRDInstallOptions) error { if len(options.Paths) > 0 { crdList, err := renderCRDs(options) if err != nil { @@ -217,7 +217,7 @@ func (p *poller) poll(ctx context.Context) (done bool, err error) { // UninstallCRDs uninstalls a collection of CRDs by reading the crd yaml files from a directory. func UninstallCRDs(config *rest.Config, options CRDInstallOptions) error { // Read the CRD yamls into options.CRDs - if err := readCRDFiles(&options); err != nil { + if err := ReadCRDFiles(&options); err != nil { return err } diff --git a/pkg/envtest/crd_test.go b/pkg/envtest/crd_test.go index 92dc48e963..a1406615d6 100644 --- a/pkg/envtest/crd_test.go +++ b/pkg/envtest/crd_test.go @@ -31,7 +31,7 @@ var _ = Describe("Test", func() { "testdata/crdv1_original", }, } - err := readCRDFiles(&opt) + err := ReadCRDFiles(&opt) Expect(err).NotTo(HaveOccurred()) expectedCRDs := sets.NewString( diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index 1a1d1ab2f4..64cbe8a4d1 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event. enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + + item := reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), - }}) + }} + + addToQueueCreate(q, evt, item) } // Update implements EventHandler. func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) { switch { case !isNil(evt.ObjectNew): - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + item := reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), - }}) + }} + + addToQueueUpdate(q, evt, item) case !isNil(evt.ObjectOld): - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + item := reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), - }}) + }} + + addToQueueUpdate(q, evt, item) default: enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt) } diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index 491bc40c42..be97fa3781 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -21,6 +21,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -63,7 +64,8 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler { // TypedEnqueueRequestsFromMapFunc is experimental and subject to future change. func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] { return &enqueueRequestsFromMapFunc[object, request]{ - toRequests: fn, + toRequests: fn, + objectImplementsClientObject: implementsClientObject[object](), } } @@ -71,7 +73,8 @@ var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Reques type enqueueRequestsFromMapFunc[object any, request comparable] struct { // Mapper transforms the argument into a slice of keys to be reconciled - toRequests TypedMapFunc[object, request] + toRequests TypedMapFunc[object, request] + objectImplementsClientObject bool } // Create implements EventHandler. @@ -81,7 +84,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create( q workqueue.TypedRateLimitingInterface[request], ) { reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.Object, reqs) + + var lowPriority bool + if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) { + clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)} + if isObjectUnchanged(clientObjectEvent) { + lowPriority = true + } + } + e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority) } // Update implements EventHandler. @@ -90,9 +101,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update( evt event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request], ) { + var lowPriority bool + if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) { + lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion() + } reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs) - e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs) + e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority) + e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority) } // Delete implements EventHandler. @@ -102,7 +117,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete( q workqueue.TypedRateLimitingInterface[request], ) { reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs, false) } // Generic implements EventHandler. @@ -112,14 +127,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic( q workqueue.TypedRateLimitingInterface[request], ) { reqs := map[request]empty{} - e.mapAndEnqueue(ctx, q, evt.Object, reqs) + e.mapAndEnqueue(ctx, q, evt.Object, reqs, false) } -func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) { +func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue( + ctx context.Context, + q workqueue.TypedRateLimitingInterface[request], + o object, + reqs map[request]empty, + lowPriority bool, +) { for _, req := range e.toRequests(ctx, o) { _, ok := reqs[req] if !ok { - q.Add(req) + if lowPriority { + q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{ + Priority: LowPriority, + }, req) + } else { + q.Add(req) + } reqs[req] = empty{} } } diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 1680043b46..e8fc8eb46e 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -72,7 +72,7 @@ func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, m for _, opt := range opts { opt(e) } - return e + return WithLowPriorityWhenUnchanged(e) } // OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true. diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index 57107f20e9..84a10ac077 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -18,6 +18,7 @@ package handler import ( "context" + "reflect" "time" "k8s.io/client-go/util/workqueue" @@ -108,10 +109,46 @@ type TypedFuncs[object any, request comparable] struct { GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request]) } +var typeForClientObject = reflect.TypeFor[client.Object]() + +func implementsClientObject[object any]() bool { + return reflect.TypeFor[object]().Implements(typeForClientObject) +} + +func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool { + _, ok := q.(priorityqueue.PriorityQueue[request]) + return ok +} + // Create implements EventHandler. func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.CreateFunc != nil { - h.CreateFunc(ctx, e, q) + if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.Object) { + h.CreateFunc(ctx, e, q) + return + } + wq := workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: q, + // We already know that we have a priority queue, that event.Object implements + // client.Object and that its not nil + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + // We construct a new event typed to client.Object because isObjectUnchanged + // is a generic and hence has to know at compile time the type of the event + // it gets. We only figure that out at runtime though, but we know for sure + // that it implements client.Object at this point so we can hardcode the event + // type to that. + evt := event.CreateEvent{Object: any(e.Object).(client.Object)} + var priority int + if isObjectUnchanged(evt) { + priority = LowPriority + } + q.(priorityqueue.PriorityQueue[request]).AddWithOpts( + priorityqueue.AddOpts{Priority: priority}, + item, + ) + }, + } + h.CreateFunc(ctx, e, wq) } } @@ -125,7 +162,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe // Update implements EventHandler. func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) { if h.UpdateFunc != nil { - h.UpdateFunc(ctx, e, q) + if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) { + h.UpdateFunc(ctx, e, q) + return + } + + wq := workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: q, + // We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement + // client.Object and that they are not nil + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + var priority int + if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() { + priority = LowPriority + } + q.(priorityqueue.PriorityQueue[request]).AddWithOpts( + priorityqueue.AddOpts{Priority: priority}, + item, + ) + }, + } + h.UpdateFunc(ctx, e, wq) } } @@ -142,43 +199,10 @@ const LowPriority = -100 // WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if // and only if a priorityqueue.PriorityQueue is used. If not, it does nothing. func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] { + // TypedFuncs already implements this so just wrap return TypedFuncs[object, request]{ - CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { - // Due to how the handlers are factored, we have to wrap the workqueue to be able - // to inject custom behavior. - u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: trli, - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) - if !isPriorityQueue { - q.Add(item) - return - } - var priority int - if isObjectUnchanged(tce) { - priority = LowPriority - } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) - }, - }) - }, - UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { - u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{ - TypedRateLimitingInterface: trli, - addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) - if !isPriorityQueue { - q.Add(item) - return - } - var priority int - if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() { - priority = LowPriority - } - priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) - }, - }) - }, + CreateFunc: u.Create, + UpdateFunc: u.Update, DeleteFunc: u.Delete, GenericFunc: u.Generic, } @@ -199,3 +223,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) { func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) } + +// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler +// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request] +func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) { + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + + var priority int + if isObjectUnchanged(evt) { + priority = LowPriority + } + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) +} + +// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler +// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request] +func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) { + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + + var priority int + if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() { + priority = LowPriority + } + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) +} diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index 6e57c22c3b..748aa3fcc4 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -659,7 +659,7 @@ var _ = Describe("Eventhandler", func() { }) Describe("Funcs", func() { - failingFuncs := handler.Funcs{ + failingFuncs := handler.TypedFuncs[client.Object, reconcile.Request]{ CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { defer GinkgoRecover() Fail("Did not expect CreateEvent to be called.") @@ -776,128 +776,224 @@ var _ = Describe("Eventhandler", func() { }) Describe("WithLowPriorityWhenUnchanged", func() { - It("should lower the priority of a create request for an object that was created more than one minute in the past", func() { - actualOpts := priorityqueue.AddOpts{} - var actualRequests []reconcile.Request - wq := &fakePriorityQueue{ - addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { - actualOpts = o - actualRequests = items + handlerPriorityTests := []struct { + name string + handler func() handler.EventHandler + }{ + { + name: "WithLowPriorityWhenUnchanged wrapper", + handler: func() handler.EventHandler { return handler.WithLowPriorityWhenUnchanged(customHandler{}) }, + }, + { + name: "EnqueueRequestForObject", + handler: func() handler.EventHandler { return &handler.EnqueueRequestForObject{} }, + }, + { + name: "EnqueueRequestForOwner", + handler: func() handler.EventHandler { + return handler.EnqueueRequestForOwner( + scheme.Scheme, + mapper, + &corev1.Pod{}, + ) }, - } - - h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) - h.Create(ctx, event.CreateEvent{ - Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - }, wq) - - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) - Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) - }) - - It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() { - actualOpts := priorityqueue.AddOpts{} - var actualRequests []reconcile.Request - wq := &fakePriorityQueue{ - addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { - actualOpts = o - actualRequests = items + }, + { + name: "TypedEnqueueRequestForOwner", + handler: func() handler.EventHandler { + return handler.TypedEnqueueRequestForOwner[client.Object]( + scheme.Scheme, + mapper, + &corev1.Pod{}, + ) }, - } - - h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) - h.Create(ctx, event.CreateEvent{ - Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - CreationTimestamp: metav1.Now(), - }}, - }, wq) - - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) - Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) - }) - - It("should lower the priority of an update request with unchanged RV", func() { - actualOpts := priorityqueue.AddOpts{} - var actualRequests []reconcile.Request - wq := &fakePriorityQueue{ - addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { - actualOpts = o - actualRequests = items + }, + { + name: "Funcs", + handler: func() handler.EventHandler { + return handler.TypedFuncs[client.Object, reconcile.Request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tce.Object.GetNamespace(), + Name: tce.Object.GetName(), + }}) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[client.Object], wq workqueue.TypedRateLimitingInterface[reconcile.Request]) { + wq.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: tue.ObjectNew.GetNamespace(), + Name: tue.ObjectNew.GetName(), + }}) + }, + } }, - } + }, + { + name: "EnqueueRequestsFromMapFunc", + handler: func() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { + return []reconcile.Request{{NamespacedName: types.NamespacedName{ + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }}} + }) + }, + }, + } + for _, test := range handlerPriorityTests { + When("handler is "+test.name, func() { + It("should lower the priority of a create request for an object that was created more than one minute in the past", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } - h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) - h.Update(ctx, event.UpdateEvent{ - ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - }, wq) + test.handler().Create(ctx, event.CreateEvent{ + Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) - Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) - }) + It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } - It("should not lower the priority of an update request with changed RV", func() { - actualOpts := priorityqueue.AddOpts{} - var actualRequests []reconcile.Request - wq := &fakePriorityQueue{ - addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { - actualOpts = o - actualRequests = items - }, - } + test.handler().Create(ctx, event.CreateEvent{ + Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + CreationTimestamp: metav1.Now(), + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) - h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) - h.Update(ctx, event.UpdateEvent{ - ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - ResourceVersion: "1", - }}, - }, wq) + It("should lower the priority of an update request with unchanged RV", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } - Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) - Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) - }) + test.handler().Update(ctx, event.UpdateEvent{ + ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) - It("should have no effect on create if the workqueue is not a priorityqueue", func() { - h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) - h.Create(ctx, event.CreateEvent{ - Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - }, q) + It("should not lower the priority of an update request with changed RV", func() { + actualOpts := priorityqueue.AddOpts{} + var actualRequests []reconcile.Request + wq := &fakePriorityQueue{ + addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) { + actualOpts = o + actualRequests = items + }, + } - Expect(q.Len()).To(Equal(1)) - item, _ := q.Get() - Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}})) - }) + test.handler().Update(ctx, event.UpdateEvent{ + ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + ResourceVersion: "1", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + }, wq) + + Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) + Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) + }) - It("should have no effect on Update if the workqueue is not a priorityqueue", func() { - h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{}) - h.Update(ctx, event.UpdateEvent{ - ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", - }}, - }, q) + It("should have no effect on create if the workqueue is not a priorityqueue", func() { + test.handler().Create(ctx, event.CreateEvent{ + Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + }, q) + + Expect(q.Len()).To(Equal(1)) + item, _ := q.Get() + Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}})) + }) - Expect(q.Len()).To(Equal(1)) - item, _ := q.Get() - Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}})) - }) + It("should have no effect on Update if the workqueue is not a priorityqueue", func() { + test.handler().Update(ctx, event.UpdateEvent{ + ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "my-pod", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Pod", + Name: "my-pod", + }}, + }}, + }, q) + + Expect(q.Len()).To(Equal(1)) + item, _ := q.Get() + Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}})) + }) + }) + } }) - }) type fakePriorityQueue struct { @@ -905,9 +1001,42 @@ type fakePriorityQueue struct { addWithOpts func(o priorityqueue.AddOpts, items ...reconcile.Request) } +func (f *fakePriorityQueue) Add(item reconcile.Request) { + f.AddWithOpts(priorityqueue.AddOpts{}, item) +} + func (f *fakePriorityQueue) AddWithOpts(o priorityqueue.AddOpts, items ...reconcile.Request) { f.addWithOpts(o, items...) } func (f *fakePriorityQueue) GetWithPriority() (item reconcile.Request, priority int, shutdown bool) { panic("GetWithPriority is not expected to be called") } + +// customHandler re-implements the basic enqueueRequestForObject logic +// to be able to test the WithLowPriorityWhenUnchanged wrapper +type customHandler struct{} + +func (ch customHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: evt.Object.GetNamespace(), + Name: evt.Object.GetName(), + }}) +} +func (ch customHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: evt.ObjectNew.GetNamespace(), + Name: evt.ObjectNew.GetName(), + }}) +} +func (ch customHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: evt.Object.GetNamespace(), + Name: evt.Object.GetName(), + }}) +} +func (ch customHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: evt.Object.GetNamespace(), + Name: evt.Object.GetName(), + }}) +} diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index fda25e0641..3f8cfdaa09 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -60,7 +61,7 @@ type Controller[request comparable] struct { // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing - Queue workqueue.TypedRateLimitingInterface[request] + Queue priorityqueue.PriorityQueue[request] // mu is used to synchronize Controller setup mu sync.Mutex @@ -157,7 +158,12 @@ func (c *Controller[request]) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - c.Queue = c.NewQueue(c.Name, c.RateLimiter) + queue := c.NewQueue(c.Name, c.RateLimiter) + if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue { + c.Queue = priorityQueue + } else { + c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue} + } go func() { <-ctx.Done() c.Queue.ShutDown() @@ -175,7 +181,16 @@ func (c *Controller[request]) Start(ctx context.Context) error { // caches. errGroup := &errgroup.Group{} for _, watch := range c.startWatches { - log := c.LogConstructor(nil).WithValues("source", fmt.Sprintf("%s", watch)) + log := c.LogConstructor(nil) + _, ok := watch.(interface { + String() string + }) + + if !ok { + log = log.WithValues("source", fmt.Sprintf("%T", watch)) + } else { + log = log.WithValues("source", fmt.Sprintf("%s", watch)) + } didStartSyncingSource := &atomic.Bool{} errGroup.Go(func() error { // Use a timeout for starting and syncing the source to avoid silently @@ -191,7 +206,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { sourceStartErrChan <- err return } - syncingSource, ok := watch.(source.SyncingSource) + syncingSource, ok := watch.(source.TypedSyncingSource[request]) if !ok { return } @@ -259,7 +274,7 @@ func (c *Controller[request]) Start(ctx context.Context) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := c.Queue.Get() + obj, priority, shutdown := c.Queue.GetWithPriority() if shutdown { // Stop working return false @@ -276,7 +291,7 @@ func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) - c.reconcileHandler(ctx, obj) + c.reconcileHandler(ctx, obj, priority) return true } @@ -299,7 +314,7 @@ func (c *Controller[request]) initMetrics() { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0) } -func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) { +func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, priority int) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { @@ -322,7 +337,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) if errors.Is(err, reconcile.TerminalError(nil)) { ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc() } else { - c.Queue.AddRateLimited(req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) } ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() @@ -337,11 +352,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) // We need to drive to stable reconcile loops before queuing due // to result.RequestAfter c.Queue.Forget(req) - c.Queue.AddAfter(req, result.RequeueAfter) + c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: log.V(5).Info("Reconcile done, requeueing") - c.Queue.AddRateLimited(req) + c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: log.V(5).Info("Reconcile successful") @@ -379,3 +394,25 @@ type reconcileIDKey struct{} func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context { return context.WithValue(ctx, reconcileIDKey{}, reconcileID) } + +type priorityQueueWrapper[request comparable] struct { + workqueue.TypedRateLimitingInterface[request] +} + +func (p *priorityQueueWrapper[request]) AddWithOpts(opts priorityqueue.AddOpts, items ...request) { + for _, item := range items { + switch { + case opts.RateLimited: + p.TypedRateLimitingInterface.AddRateLimited(item) + case opts.After > 0: + p.TypedRateLimitingInterface.AddAfter(item, opts.After) + default: + p.TypedRateLimitingInterface.Add(item) + } + } +} + +func (p *priorityQueueWrapper[request]) GetWithPriority() (request, int, bool) { + item, shutdown := p.TypedRateLimitingInterface.Get() + return item, 0, shutdown +} diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 52f45612f2..bf334d22e8 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" @@ -46,6 +47,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) +type TestRequest struct { + Key string +} + var _ = Describe("controller", func() { var fakeReconcile *fakeReconciler var ctrl *Controller[reconcile.Request] @@ -340,6 +345,42 @@ var _ = Describe("controller", func() { Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times")) }) + It("should check for correct TypedSyncingSource if custom types are used", func() { + queue := &priorityQueueWrapper[TestRequest]{ + TypedRateLimitingInterface: &controllertest.TypedQueue[TestRequest]{ + TypedInterface: workqueue.NewTyped[TestRequest](), + }} + ctrl := &Controller[TestRequest]{ + NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] { + return queue + }, + LogConstructor: func(*TestRequest) logr.Logger { + return log.RuntimeLog.WithName("controller").WithName("test") + }, + } + ctrl.CacheSyncTimeout = time.Second + src := &bisignallingSource[TestRequest]{ + startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]), + startDone: make(chan error, 1), + waitCall: make(chan struct{}), + waitDone: make(chan error, 1), + } + ctrl.startWatches = []source.TypedSource[TestRequest]{src} + ctrl.Name = "foo" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + startCh := make(chan error) + go func() { + defer GinkgoRecover() + startCh <- ctrl.Start(ctx) + }() + Eventually(src.startCall).Should(Receive(Equal(queue))) + src.startDone <- nil + Eventually(src.waitCall).Should(BeClosed()) + src.waitDone <- nil + cancel() + Eventually(startCh).Should(Receive(Succeed())) + }) }) Describe("Processing queue items from a Controller", func() { @@ -361,10 +402,6 @@ var _ = Describe("controller", func() { Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) }) - PIt("should forget an item if it is not a Request and continue processing items", func() { - // TODO(community): write this test - }) - It("should requeue a Request if there is an error and continue processing items", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -484,6 +521,37 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) + It("should retain the priority when the reconciler requests a requeue", func() { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + + By("Invoking Reconciler which will request a requeue") + fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + RateLimited: true, + Priority: 10, + }, + items: []reconcile.Request{request}, + }})) + }) + It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { @@ -516,6 +584,37 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) + It("should retain the priority with RequeAfter", func() { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + + By("Invoking Reconciler which will ask for RequeueAfter") + fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + After: time.Millisecond * 100, + Priority: 10, + }, + items: []reconcile.Request{request}, + }})) + }) + It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() { dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)} ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { @@ -547,6 +646,37 @@ var _ = Describe("controller", func() { Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0)) }) + It("should retain the priority when there was an error", func() { + q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")} + ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + return q + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request) + + By("Invoking Reconciler which will return an error") + fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again")) + Expect(<-reconciled).To(Equal(request)) + Eventually(func() []priorityQueueAddition { + q.lock.Lock() + defer q.lock.Unlock() + return q.added + }).Should(Equal([]priorityQueueAddition{{ + AddOpts: priorityqueue.AddOpts{ + RateLimited: true, + Priority: 10, + }, + items: []reconcile.Request{request}, + }})) + }) + PIt("should return if the queue is shutdown", func() { // TODO(community): write this test }) @@ -901,3 +1031,58 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte <-ctx.Done() return nil, errors.New("GetInformer timed out") } + +type bisignallingSource[T comparable] struct { + // receives the queue that is passed to Start + startCall chan workqueue.TypedRateLimitingInterface[T] + // passes an error to return from Start + startDone chan error + // closed when WaitForSync is called + waitCall chan struct{} + // passes an error to return from WaitForSync + waitDone chan error +} + +var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil) + +func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error { + select { + case t.startCall <- q: + case <-ctx.Done(): + return ctx.Err() + } + select { + case err := <-t.startDone: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error { + close(t.waitCall) + select { + case err := <-t.waitDone: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +type priorityQueueAddition struct { + priorityqueue.AddOpts + items []reconcile.Request +} + +type fakePriorityQueue struct { + priorityqueue.PriorityQueue[reconcile.Request] + + lock sync.Mutex + added []priorityQueueAddition +} + +func (f *fakePriorityQueue) AddWithOpts(o priorityqueue.AddOpts, items ...reconcile.Request) { + f.lock.Lock() + defer f.lock.Unlock() + f.added = append(f.added, priorityQueueAddition{AddOpts: o, items: items}) +} diff --git a/pkg/webhook/admission/multi.go b/pkg/webhook/admission/multi.go index 2f7820d04b..ef9c456248 100644 --- a/pkg/webhook/admission/multi.go +++ b/pkg/webhook/admission/multi.go @@ -31,6 +31,7 @@ type multiMutating []Handler func (hs multiMutating) Handle(ctx context.Context, req Request) Response { patches := []jsonpatch.JsonPatchOperation{} + warnings := []string{} for _, handler := range hs { resp := handler.Handle(ctx, req) if !resp.Allowed { @@ -42,6 +43,7 @@ func (hs multiMutating) Handle(ctx context.Context, req Request) Response { resp.PatchType, admissionv1.PatchTypeJSONPatch)) } patches = append(patches, resp.Patches...) + warnings = append(warnings, resp.Warnings...) } var err error marshaledPatch, err := json.Marshal(patches) @@ -55,6 +57,7 @@ func (hs multiMutating) Handle(ctx context.Context, req Request) Response { Code: http.StatusOK, }, Patch: marshaledPatch, + Warnings: warnings, PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(), }, } @@ -71,11 +74,13 @@ func MultiMutatingHandler(handlers ...Handler) Handler { type multiValidating []Handler func (hs multiValidating) Handle(ctx context.Context, req Request) Response { + warnings := []string{} for _, handler := range hs { resp := handler.Handle(ctx, req) if !resp.Allowed { return resp } + warnings = append(warnings, resp.Warnings...) } return Response{ AdmissionResponse: admissionv1.AdmissionResponse{ @@ -83,6 +88,7 @@ func (hs multiValidating) Handle(ctx context.Context, req Request) Response { Result: &metav1.Status{ Code: http.StatusOK, }, + Warnings: warnings, }, } } diff --git a/pkg/webhook/admission/multi_test.go b/pkg/webhook/admission/multi_test.go index da85a52e42..d41675ab30 100644 --- a/pkg/webhook/admission/multi_test.go +++ b/pkg/webhook/admission/multi_test.go @@ -46,6 +46,17 @@ var _ = Describe("Multi-Handler Admission Webhooks", func() { }, } + withWarnings := &fakeHandler{ + fn: func(ctx context.Context, req Request) Response { + return Response{ + AdmissionResponse: admissionv1.AdmissionResponse{ + Allowed: true, + Warnings: []string{"handler-warning"}, + }, + } + }, + } + Context("with validating handlers", func() { It("should deny the request if any handler denies the request", func() { By("setting up a handler with accept and deny") @@ -54,6 +65,7 @@ var _ = Describe("Multi-Handler Admission Webhooks", func() { By("checking that the handler denies the request") resp := handler.Handle(context.Background(), Request{}) Expect(resp.Allowed).To(BeFalse()) + Expect(resp.Warnings).To(BeEmpty()) }) It("should allow the request if all handlers allow the request", func() { @@ -63,6 +75,17 @@ var _ = Describe("Multi-Handler Admission Webhooks", func() { By("checking that the handler allows the request") resp := handler.Handle(context.Background(), Request{}) Expect(resp.Allowed).To(BeTrue()) + Expect(resp.Warnings).To(BeEmpty()) + }) + + It("should show the warnings if all handlers allow the request", func() { + By("setting up a handler with only accept") + handler := MultiValidatingHandler(alwaysAllow, withWarnings) + + By("checking that the handler allows the request") + resp := handler.Handle(context.Background(), Request{}) + Expect(resp.Allowed).To(BeTrue()) + Expect(resp.Warnings).To(HaveLen(1)) }) }) @@ -107,6 +130,25 @@ var _ = Describe("Multi-Handler Admission Webhooks", func() { }, } + patcher3 := &fakeHandler{ + fn: func(ctx context.Context, req Request) Response { + return Response{ + Patches: []jsonpatch.JsonPatchOperation{ + { + Operation: "add", + Path: "/metadata/annotation/newest-key", + Value: "value", + }, + }, + AdmissionResponse: admissionv1.AdmissionResponse{ + Allowed: true, + Warnings: []string{"annotation-warning"}, + PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(), + }, + } + }, + } + It("should not return any patches if the request is denied", func() { By("setting up a webhook with some patches and a deny") handler := MultiMutatingHandler(patcher1, patcher2, alwaysDeny) @@ -128,5 +170,20 @@ var _ = Describe("Multi-Handler Admission Webhooks", func() { `[{"op":"add","path":"/metadata/annotation/new-key","value":"new-value"},` + `{"op":"replace","path":"/spec/replicas","value":"2"},{"op":"add","path":"/metadata/annotation/hello","value":"world"}]`))) }) + + It("should produce all patches if the requests are all allowed and show warnings", func() { + By("setting up a webhook with some patches") + handler := MultiMutatingHandler(patcher1, patcher2, alwaysAllow, patcher3) + + By("checking that the handler accepts the request and returns all patches") + resp := handler.Handle(context.Background(), Request{}) + Expect(resp.Allowed).To(BeTrue()) + Expect(resp.Patch).To(Equal([]byte( + `[{"op":"add","path":"/metadata/annotation/new-key","value":"new-value"},` + + `{"op":"replace","path":"/spec/replicas","value":"2"},{"op":"add","path":"/metadata/annotation/hello","value":"world"},` + + `{"op":"add","path":"/metadata/annotation/newest-key","value":"value"}]`))) + Expect(resp.Warnings).To(HaveLen(1)) + }) + }) }) diff --git a/tools/setup-envtest/go.mod b/tools/setup-envtest/go.mod index 87325cf8f0..62b3919514 100644 --- a/tools/setup-envtest/go.mod +++ b/tools/setup-envtest/go.mod @@ -5,24 +5,24 @@ go 1.23.0 require ( github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 - github.com/onsi/ginkgo/v2 v2.21.0 - github.com/onsi/gomega v1.35.1 - github.com/spf13/afero v1.6.0 - github.com/spf13/pflag v1.0.5 - go.uber.org/zap v1.26.0 - k8s.io/apimachinery v0.32.0 + github.com/onsi/ginkgo/v2 v2.22.2 + github.com/onsi/gomega v1.36.2 + github.com/spf13/afero v1.12.0 + github.com/spf13/pflag v1.0.6 + go.uber.org/zap v1.27.0 + k8s.io/apimachinery v0.32.1 sigs.k8s.io/yaml v1.4.0 ) require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect + github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/net v0.30.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.19.0 // indirect - golang.org/x/tools v0.26.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + golang.org/x/net v0.33.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/tools v0.28.0 // indirect + google.golang.org/protobuf v1.36.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/tools/setup-envtest/go.sum b/tools/setup-envtest/go.sum index 4d82b7d429..aad1fa3e11 100644 --- a/tools/setup-envtest/go.sum +++ b/tools/setup-envtest/go.sum @@ -1,4 +1,3 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= @@ -10,56 +9,41 @@ github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= -github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= -github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= -github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= -github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= -github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg= +github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU= +github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= -github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= +github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= -golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/apimachinery v0.32.0 h1:cFSE7N3rmEEtv4ei5X6DaJPHHX0C+upp+v5lVPiEwpg= -k8s.io/apimachinery v0.32.0/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= +k8s.io/apimachinery v0.32.1 h1:683ENpaCBjma4CYqsmZyhEzrGz6cjn1MY/X2jB2hkZs= +k8s.io/apimachinery v0.32.1/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/tools/setup-envtest/main.go b/tools/setup-envtest/main.go index 3121e206fd..7eb5ec43d3 100644 --- a/tools/setup-envtest/main.go +++ b/tools/setup-envtest/main.go @@ -184,6 +184,9 @@ Commands: reads a .tar.gz file from stdin and expand it into the store. must have a concrete version and platform. + version: + list the installed version of setup-envtest. + Versions: Versions take the form of a small subset of semver selectors. @@ -256,7 +259,6 @@ Environment Variables: version = flag.Arg(1) } env := setupEnv(globalLog, version) - // perform our main set of actions switch action := flag.Arg(0); action { case "use": @@ -274,6 +276,8 @@ Environment Variables: Input: os.Stdin, PrintFormat: printFormat, }.Do(env) + case "version": + workflows.Version{}.Do(env) default: flag.Usage() envp.Exit(2, "unknown action %q", action) diff --git a/tools/setup-envtest/version/version.go b/tools/setup-envtest/version/version.go new file mode 100644 index 0000000000..58e7e309a4 --- /dev/null +++ b/tools/setup-envtest/version/version.go @@ -0,0 +1,21 @@ +package version + +import "runtime/debug" + +// Version to be set using ldflags: +// -ldflags "-X sigs.k8s.io/controller-tools/pkg/version.version=v1.0.0" +// falls back to module information is unse +var version = "" + +// Version returns the version of the main module +func Version() string { + if version != "" { + return version + } + info, ok := debug.ReadBuildInfo() + if !ok || info == nil || info.Main.Version == "" { + // binary has not been built with module support or doesn't contain a version. + return "(unknown)" + } + return info.Main.Version +} diff --git a/tools/setup-envtest/version/version_suite_test.go b/tools/setup-envtest/version/version_suite_test.go new file mode 100644 index 0000000000..99c623e8d4 --- /dev/null +++ b/tools/setup-envtest/version/version_suite_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestVersioning(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Test Version Suite") +} diff --git a/tools/setup-envtest/version/version_test.go b/tools/setup-envtest/version/version_test.go new file mode 100644 index 0000000000..4178cac870 --- /dev/null +++ b/tools/setup-envtest/version/version_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2024 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +See the License for the specific language governing permissions and + +limitations under the License. +*/ + +package version + +import ( + "runtime/debug" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("TestVersion", func() { + + info, ok := debug.ReadBuildInfo() + Expect(ok).To(BeTrue()) + tests := map[string]struct { + version string + expected string + }{ + "empty returns build info": { + version: "", + expected: info.Main.Version, + }, + "set to a value returns it": { + version: "1.2.3", + expected: "1.2.3", + }, + } + for name, tc := range tests { + It("Version set to "+name, func() { + versionBackup := version + defer func() { + version = versionBackup + }() + version = tc.version + result := Version() + Expect(result).To(Equal(tc.expected)) + }) + } +}) diff --git a/tools/setup-envtest/workflows/workflows.go b/tools/setup-envtest/workflows/workflows.go index fdabd995ae..fb9123d269 100644 --- a/tools/setup-envtest/workflows/workflows.go +++ b/tools/setup-envtest/workflows/workflows.go @@ -5,11 +5,13 @@ package workflows import ( "context" + "fmt" "io" "github.com/go-logr/logr" envp "sigs.k8s.io/controller-runtime/tools/setup-envtest/env" + "sigs.k8s.io/controller-runtime/tools/setup-envtest/version" ) // Use is a workflow that prints out information about stored @@ -85,3 +87,12 @@ func (f Sideload) Do(env *envp.Env) { env.Sideload(ctx, f.Input) env.PrintInfo(f.PrintFormat) } + +// Version is the workflow that shows the current binary version +// of setup-envtest. +type Version struct{} + +// Do executes the workflow. +func (v Version) Do(env *envp.Env) { + fmt.Fprintf(env.Out, "setup-envtest version: %s\n", version.Version()) +} diff --git a/tools/setup-envtest/workflows/workflows_test.go b/tools/setup-envtest/workflows/workflows_test.go index 27d4ec6770..435ae24285 100644 --- a/tools/setup-envtest/workflows/workflows_test.go +++ b/tools/setup-envtest/workflows/workflows_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io/fs" "path/filepath" + "runtime/debug" "sort" "strings" @@ -443,4 +444,16 @@ var _ = Describe("Workflows", func() { Expect(string(outContents)).To(HavePrefix(expectedPrefix), "should have the debugging prefix") }) }) + + Describe("version", func() { + It("should print out the version if the RELEASE_TAG is empty", func() { + v := wf.Version{} + v.Do(env) + info, ok := debug.ReadBuildInfo() + Expect(ok).To(BeTrue()) + Expect(out.String()).ToNot(BeEmpty()) + Expect(out.String()).To(Equal(fmt.Sprintf("setup-envtest version: %s\n", info.Main.Version))) + }) + }) + })