Skip to content

Commit a0dfd95

Browse files
authored
Merge pull request kubernetes#107163 from cyclinder/fix_leak_goroutine
fix goroutine leaks in TestConfigurationChannels
2 parents ca4af7a + 928e686 commit a0dfd95

File tree

5 files changed

+93
-31
lines changed

5 files changed

+93
-31
lines changed

pkg/kubelet/config/config.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package config
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"reflect"
2223
"sync"
@@ -81,11 +82,11 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder)
8182

8283
// Channel creates or returns a config source channel. The channel
8384
// only accepts PodUpdates
84-
func (c *PodConfig) Channel(source string) chan<- interface{} {
85+
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
8586
c.sourcesLock.Lock()
8687
defer c.sourcesLock.Unlock()
8788
c.sources.Insert(source)
88-
return c.mux.Channel(source)
89+
return c.mux.ChannelWithContext(ctx, source)
8990
}
9091

9192
// SeenAllSources returns true if seenSources contains all sources in the

pkg/kubelet/config/config_test.go

+59-16
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package config
1818

1919
import (
20+
"context"
2021
"math/rand"
2122
"reflect"
2223
"sort"
@@ -85,10 +86,10 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
8586
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
8687
}
8788

88-
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
89+
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
8990
eventBroadcaster := record.NewBroadcaster()
9091
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
91-
channel := config.Channel(TestSource)
92+
channel := config.Channel(ctx, TestSource)
9293
ch := config.Updates()
9394
return channel, ch, config
9495
}
@@ -129,7 +130,10 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
129130
}
130131

131132
func TestNewPodAdded(t *testing.T) {
132-
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
133+
ctx, cancel := context.WithCancel(context.Background())
134+
defer cancel()
135+
136+
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
133137

134138
// see an update
135139
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -141,7 +145,10 @@ func TestNewPodAdded(t *testing.T) {
141145
}
142146

143147
func TestNewPodAddedInvalidNamespace(t *testing.T) {
144-
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
148+
ctx, cancel := context.WithCancel(context.Background())
149+
defer cancel()
150+
151+
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
145152

146153
// see an update
147154
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
@@ -153,7 +160,10 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) {
153160
}
154161

155162
func TestNewPodAddedDefaultNamespace(t *testing.T) {
156-
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
163+
ctx, cancel := context.WithCancel(context.Background())
164+
defer cancel()
165+
166+
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
157167

158168
// see an update
159169
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
@@ -165,7 +175,10 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) {
165175
}
166176

167177
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
168-
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
178+
ctx, cancel := context.WithCancel(context.Background())
179+
defer cancel()
180+
181+
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
169182

170183
// see an update
171184
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
@@ -182,7 +195,10 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) {
182195
}
183196

184197
func TestInvalidPodFiltered(t *testing.T) {
185-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
198+
ctx, cancel := context.WithCancel(context.Background())
199+
defer cancel()
200+
201+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
186202

187203
// see an update
188204
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -196,7 +212,10 @@ func TestInvalidPodFiltered(t *testing.T) {
196212
}
197213

198214
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
199-
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
215+
ctx, cancel := context.WithCancel(context.Background())
216+
defer cancel()
217+
218+
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates)
200219

201220
// see an set
202221
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -214,7 +233,10 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
214233
}
215234

216235
func TestNewPodAddedSnapshot(t *testing.T) {
217-
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
236+
ctx, cancel := context.WithCancel(context.Background())
237+
defer cancel()
238+
239+
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot)
218240

219241
// see an set
220242
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -232,7 +254,10 @@ func TestNewPodAddedSnapshot(t *testing.T) {
232254
}
233255

234256
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
235-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
257+
ctx, cancel := context.WithCancel(context.Background())
258+
defer cancel()
259+
260+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
236261

237262
// should register an add
238263
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -255,7 +280,10 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
255280
}
256281

257282
func TestNewPodAddedDelete(t *testing.T) {
258-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
283+
ctx, cancel := context.WithCancel(context.Background())
284+
defer cancel()
285+
286+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
259287

260288
// should register an add
261289
addedPod := CreateValidPod("foo", "new")
@@ -274,7 +302,10 @@ func TestNewPodAddedDelete(t *testing.T) {
274302
}
275303

276304
func TestNewPodAddedUpdatedSet(t *testing.T) {
277-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
305+
ctx, cancel := context.WithCancel(context.Background())
306+
defer cancel()
307+
308+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
278309

279310
// should register an add
280311
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
@@ -296,6 +327,9 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
296327
}
297328

298329
func TestNewPodAddedSetReconciled(t *testing.T) {
330+
ctx, cancel := context.WithCancel(context.Background())
331+
defer cancel()
332+
299333
// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
300334
// before touching to avoid data race.
301335
newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) {
@@ -318,7 +352,7 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
318352
} {
319353
var podWithStatusChange *v1.Pod
320354
pods, _ := newTestPods(false, false)
321-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
355+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
322356

323357
// Use SET to initialize the config, especially initialize the source set
324358
channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
@@ -341,6 +375,9 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
341375
}
342376

343377
func TestInitialEmptySet(t *testing.T) {
378+
ctx, cancel := context.WithCancel(context.Background())
379+
defer cancel()
380+
344381
for _, test := range []struct {
345382
mode PodConfigNotificationMode
346383
op kubetypes.PodOperation
@@ -349,7 +386,7 @@ func TestInitialEmptySet(t *testing.T) {
349386
{PodConfigNotificationSnapshot, kubetypes.SET},
350387
{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
351388
} {
352-
channel, ch, _ := createPodConfigTester(test.mode)
389+
channel, ch, _ := createPodConfigTester(ctx, test.mode)
353390

354391
// should register an empty PodUpdate operation
355392
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
@@ -366,7 +403,10 @@ func TestInitialEmptySet(t *testing.T) {
366403
}
367404

368405
func TestPodUpdateAnnotations(t *testing.T) {
369-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
406+
ctx, cancel := context.WithCancel(context.Background())
407+
defer cancel()
408+
409+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
370410

371411
pod := CreateValidPod("foo2", "new")
372412
pod.Annotations = make(map[string]string)
@@ -395,7 +435,10 @@ func TestPodUpdateAnnotations(t *testing.T) {
395435
}
396436

397437
func TestPodUpdateLabels(t *testing.T) {
398-
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
438+
ctx, cancel := context.WithCancel(context.Background())
439+
defer cancel()
440+
441+
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
399442

400443
pod := CreateValidPod("foo2", "new")
401444
pod.Labels = make(map[string]string)

pkg/kubelet/kubelet.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -270,21 +270,24 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
270270
// source of all configuration
271271
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
272272

273+
// TODO: it needs to be replaced by a proper context in the future
274+
ctx := context.TODO()
275+
273276
// define file config source
274277
if kubeCfg.StaticPodPath != "" {
275278
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
276-
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
279+
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
277280
}
278281

279282
// define url config source
280283
if kubeCfg.StaticPodURL != "" {
281284
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
282-
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
285+
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
283286
}
284287

285288
if kubeDeps.KubeClient != nil {
286289
klog.InfoS("Adding apiserver pod source")
287-
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
290+
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
288291
}
289292
return cfg, nil
290293
}

pkg/util/config/config.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package config
1818

1919
import (
20+
"context"
2021
"sync"
2122

2223
"k8s.io/apimachinery/pkg/util/wait"
@@ -57,12 +58,12 @@ func NewMux(merger Merger) *Mux {
5758
return mux
5859
}
5960

60-
// Channel returns a channel where a configuration source
61+
// ChannelWithContext returns a channel where a configuration source
6162
// can send updates of new configurations. Multiple calls with the same
6263
// source will return the same channel. This allows change and state based sources
6364
// to use the same channel. Different source names however will be treated as a
6465
// union.
65-
func (m *Mux) Channel(source string) chan interface{} {
66+
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
6667
if len(source) == 0 {
6768
panic("Channel given an empty name")
6869
}
@@ -74,7 +75,8 @@ func (m *Mux) Channel(source string) chan interface{} {
7475
}
7576
newChannel := make(chan interface{})
7677
m.sources[source] = newChannel
77-
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
78+
79+
go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
7880
return newChannel
7981
}
8082

pkg/util/config/config_test.go

+20-7
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ limitations under the License.
1717
package config
1818

1919
import (
20+
"context"
2021
"reflect"
2122
"testing"
2223
)
2324

2425
func TestConfigurationChannels(t *testing.T) {
26+
ctx, cancel := context.WithCancel(context.Background())
27+
defer cancel()
28+
2529
mux := NewMux(nil)
26-
channelOne := mux.Channel("one")
27-
if channelOne != mux.Channel("one") {
30+
channelOne := mux.ChannelWithContext(ctx, "one")
31+
if channelOne != mux.ChannelWithContext(ctx, "one") {
2832
t.Error("Didn't get the same muxuration channel back with the same name")
2933
}
30-
channelTwo := mux.Channel("two")
34+
channelTwo := mux.ChannelWithContext(ctx, "two")
3135
if channelOne == channelTwo {
3236
t.Error("Got back the same muxuration channel for different names")
3337
}
@@ -50,12 +54,18 @@ func (m MergeMock) Merge(source string, update interface{}) error {
5054
}
5155

5256
func TestMergeInvoked(t *testing.T) {
57+
ctx, cancel := context.WithCancel(context.Background())
58+
defer cancel()
59+
5360
merger := MergeMock{"one", "test", t}
5461
mux := NewMux(&merger)
55-
mux.Channel("one") <- "test"
62+
mux.ChannelWithContext(ctx, "one") <- "test"
5663
}
5764

5865
func TestMergeFuncInvoked(t *testing.T) {
66+
ctx, cancel := context.WithCancel(context.Background())
67+
defer cancel()
68+
5969
ch := make(chan bool)
6070
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
6171
if source != "one" {
@@ -67,11 +77,14 @@ func TestMergeFuncInvoked(t *testing.T) {
6777
ch <- true
6878
return nil
6979
}))
70-
mux.Channel("one") <- "test"
80+
mux.ChannelWithContext(ctx, "one") <- "test"
7181
<-ch
7282
}
7383

7484
func TestSimultaneousMerge(t *testing.T) {
85+
ctx, cancel := context.WithCancel(context.Background())
86+
defer cancel()
87+
7588
ch := make(chan bool, 2)
7689
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
7790
switch source {
@@ -89,8 +102,8 @@ func TestSimultaneousMerge(t *testing.T) {
89102
ch <- true
90103
return nil
91104
}))
92-
source := mux.Channel("one")
93-
source2 := mux.Channel("two")
105+
source := mux.ChannelWithContext(ctx, "one")
106+
source2 := mux.ChannelWithContext(ctx, "two")
94107
source <- "test"
95108
source2 <- "test2"
96109
<-ch

0 commit comments

Comments
 (0)