Skip to content
Permalink
Browse files
Merge pull request #110138 from wojtek-t/fix_leaking_goroutines_in_ku…
…belet_test

Fix leaking goroutines in kubelet integration test
  • Loading branch information
k8s-ci-robot committed May 23, 2022
2 parents 9997897 + 0d41d29 commit e9f1c9cc7ccb23898a08212d995c3f5114e8b338
Showing 2 changed files with 42 additions and 11 deletions.
@@ -164,8 +164,9 @@ type objectCache struct {
clock clock.Clock
maxIdleTime time.Duration

lock sync.RWMutex
items map[objectKey]*objectCacheItem
lock sync.RWMutex
items map[objectKey]*objectCacheItem
stopped bool
}

const minIdleTime = 1 * time.Minute
@@ -178,7 +179,8 @@ func NewObjectCache(
isImmutable isImmutableFunc,
groupResource schema.GroupResource,
clock clock.Clock,
maxIdleTime time.Duration) Store {
maxIdleTime time.Duration,
stopCh <-chan struct{}) Store {

if maxIdleTime < minIdleTime {
maxIdleTime = minIdleTime
@@ -195,8 +197,8 @@ func NewObjectCache(
items: make(map[objectKey]*objectCacheItem),
}

// TODO propagate stopCh from the higher level.
go wait.Until(store.startRecycleIdleWatch, time.Minute, wait.NeverStop)
go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh)
go store.shutdownWhenStopped(stopCh)
return store
}

@@ -210,7 +212,7 @@ func (c *objectCache) newStore() *cacheStore {
return &cacheStore{store, sync.Mutex{}, false}
}

func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheItem {
fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
@@ -235,7 +237,11 @@ func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
hasSynced: func() (bool, error) { return store.hasSynced(), nil },
stopCh: make(chan struct{}),
}
go item.startReflector()

// Don't start reflector if Kubelet is already shutting down.
if !c.stopped {
go item.startReflector()
}
return item
}

@@ -251,7 +257,7 @@ func (c *objectCache) AddReference(namespace, name string) {
defer c.lock.Unlock()
item, exists := c.items[key]
if !exists {
item = c.newReflector(namespace, name)
item = c.newReflectorLocked(namespace, name)
c.items[key] = item
}
item.refCount++
@@ -281,6 +287,12 @@ func (c *objectCache) key(namespace, name string) string {
return name
}

func (c *objectCache) isStopped() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.stopped
}

func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
key := objectKey{namespace: namespace, name: name}

@@ -295,7 +307,10 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
// This protects from premature (racy) reflector closure.
item.setLastAccessTime(c.clock.Now())

item.restartReflectorIfNeeded()
// Don't restart reflector if Kubelet is already shutting down.
if !c.isStopped() {
item.restartReflectorIfNeeded()
}
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
}
@@ -339,6 +354,18 @@ func (c *objectCache) startRecycleIdleWatch() {
}
}

func (c *objectCache) shutdownWhenStopped(stopCh <-chan struct{}) {
<-stopCh

c.lock.Lock()
defer c.lock.Unlock()

c.stopped = true
for _, item := range c.items {
item.stop()
}
}

// NewWatchBasedManager creates a manager that keeps a cache of all objects
// necessary for registered pods.
// It implements the following logic:
@@ -360,6 +387,7 @@ func NewWatchBasedManager(
// We currently set it to 5 times.
maxIdleTime := resyncInterval * 5

objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime)
// TODO propagate stopCh from the higher level.
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
return NewCacheBasedManager(objectStore, getReferencedObjects)
}
@@ -63,7 +63,10 @@ func TestWatchBasedManager(t *testing.T) {
// So don't treat any secret as immutable here.
isImmutable := func(_ runtime.Object) bool { return false }
fakeClock := testingclock.NewFakeClock(time.Now())
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute)

stopCh := make(chan struct{})
defer close(stopCh)
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh)

// create 1000 secrets in parallel
t.Log(time.Now(), "creating 1000 secrets")

0 comments on commit e9f1c9c

Please sign in to comment.