Skip to content

Commit 98b5b22

Browse files
authored
🐛 Fix(manager): Prevent goroutine leak on shutdown timeout (#3247)
* test: add goroutine leak test for slow runnables during shutdown * fix: prevent goroutine leak on manager shutdown timeout * address comments
1 parent 6e35b34 commit 98b5b22

File tree

3 files changed

+90
-2
lines changed

3 files changed

+90
-2
lines changed

pkg/manager/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
432432
}
433433

434434
errChan := make(chan error, 1)
435-
runnables := newRunnables(options.BaseContext, errChan)
435+
runnables := newRunnables(options.BaseContext, errChan).withLogger(options.Logger)
436436
return &controllerManager{
437437
stopProcedureEngaged: ptr.To(int64(0)),
438438
cluster: cluster,

pkg/manager/manager_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,6 +1905,54 @@ var _ = Describe("manger.Manager", func() {
19051905
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
19061906
})
19071907

1908+
It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func() {
1909+
// This test reproduces the race condition where the manager's Start method
1910+
// exits due to context cancellation, leaving no one to drain errChan
1911+
1912+
currentGRs := goleak.IgnoreCurrent()
1913+
1914+
// Create manager with a very short graceful shutdown timeout to reliablytrigger the race condition
1915+
shortGracefulShutdownTimeout := 10 * time.Millisecond
1916+
m, err := New(cfg, Options{
1917+
GracefulShutdownTimeout: &shortGracefulShutdownTimeout,
1918+
})
1919+
Expect(err).NotTo(HaveOccurred())
1920+
1921+
// Add the slow runnable that will return an error after some delay
1922+
for i := 0; i < 3; i++ {
1923+
slowRunnable := RunnableFunc(func(c context.Context) error {
1924+
<-c.Done()
1925+
1926+
// Simulate some work that delays the error from being returned
1927+
// Choosing a large delay to reliably trigger the race condition
1928+
time.Sleep(100 * time.Millisecond)
1929+
1930+
// This simulates the race condition where runnables try to send
1931+
// errors after the manager has stopped reading from errChan
1932+
return errors.New("slow runnable error")
1933+
})
1934+
1935+
Expect(m.Add(slowRunnable)).To(Succeed())
1936+
}
1937+
1938+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
1939+
defer cancel()
1940+
go func() {
1941+
defer GinkgoRecover()
1942+
Expect(m.Start(ctx)).To(HaveOccurred()) // We expect error here because the slow runnables will return errors
1943+
}()
1944+
1945+
// Wait for context to be cancelled
1946+
<-ctx.Done()
1947+
1948+
// Give time for any leaks to become apparent. This makes sure that we don't false alarm on go routine leaks because runnables are still running.
1949+
time.Sleep(300 * time.Millisecond)
1950+
1951+
// force-close keep-alive connections
1952+
clientTransport.CloseIdleConnections()
1953+
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
1954+
})
1955+
19081956
It("should provide a function to get the Config", func() {
19091957
m, err := New(cfg, Options{})
19101958
Expect(err).NotTo(HaveOccurred())

pkg/manager/runnable_group.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"sync"
77

8+
"github.com/go-logr/logr"
89
"sigs.k8s.io/controller-runtime/pkg/webhook"
910
)
1011

@@ -48,6 +49,16 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
4849
}
4950
}
5051

52+
// withLogger returns the runnables with the logger set for all runnable groups.
53+
func (r *runnables) withLogger(logger logr.Logger) *runnables {
54+
r.HTTPServers.withLogger(logger)
55+
r.Webhooks.withLogger(logger)
56+
r.Caches.withLogger(logger)
57+
r.LeaderElection.withLogger(logger)
58+
r.Others.withLogger(logger)
59+
return r
60+
}
61+
5162
// Add adds a runnable to closest group of runnable that they belong to.
5263
//
5364
// Add should be able to be called before and after Start, but not after StopAndWait.
@@ -119,6 +130,9 @@ type runnableGroup struct {
119130
// wg is an internal sync.WaitGroup that allows us to properly stop
120131
// and wait for all the runnables to finish before returning.
121132
wg *sync.WaitGroup
133+
134+
// logger is used for logging when errors are dropped during shutdown
135+
logger logr.Logger
122136
}
123137

124138
func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
@@ -127,12 +141,18 @@ func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnable
127141
errChan: errChan,
128142
ch: make(chan *readyRunnable),
129143
wg: new(sync.WaitGroup),
144+
logger: logr.Discard(), // Default to no-op logger
130145
}
131146

132147
r.ctx, r.cancel = context.WithCancel(baseContext())
133148
return r
134149
}
135150

151+
// withLogger sets the logger for this runnable group.
152+
func (r *runnableGroup) withLogger(logger logr.Logger) {
153+
r.logger = logger
154+
}
155+
136156
// Started returns true if the group has started.
137157
func (r *runnableGroup) Started() bool {
138158
r.start.Lock()
@@ -238,7 +258,27 @@ func (r *runnableGroup) reconcile() {
238258

239259
// Start the runnable.
240260
if err := rn.Start(r.ctx); err != nil {
241-
r.errChan <- err
261+
// Check if we're during the shutdown process.
262+
r.stop.RLock()
263+
isStopped := r.stopped
264+
r.stop.RUnlock()
265+
266+
if isStopped {
267+
// During shutdown, try to send error first (error drain goroutine might still be running)
268+
// but drop if it would block to prevent goroutine leaks
269+
select {
270+
case r.errChan <- err:
271+
// Error sent successfully (error drain goroutine is still running)
272+
default:
273+
// Error drain goroutine has exited, drop error to prevent goroutine leak
274+
if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown
275+
r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err)
276+
}
277+
}
278+
} else {
279+
// During normal operation, always try to send errors (may block briefly)
280+
r.errChan <- err
281+
}
242282
}
243283
}(runnable)
244284
}

0 commit comments

Comments
 (0)