Fix zk connection leak (#5675)

Signed-off-by: Ye Ji <ye@hioscar.com>
This commit is contained in:
Ye Ji 2019-06-21 08:21:13 -04:00 committed by Brian Brazil
parent dc22f74153
commit 2c715d22cf

View file

@ -17,6 +17,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
@ -68,6 +69,7 @@ type ZookeeperTreeCache struct {
prefix string prefix string
events chan ZookeeperTreeCacheEvent events chan ZookeeperTreeCacheEvent
stop chan struct{} stop chan struct{}
wg *sync.WaitGroup
head *zookeeperTreeCacheNode head *zookeeperTreeCacheNode
logger log.Logger logger log.Logger
@ -94,14 +96,17 @@ func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTree
prefix: path, prefix: path,
events: events, events: events,
stop: make(chan struct{}), stop: make(chan struct{}),
wg: &sync.WaitGroup{},
logger: logger, logger: logger,
} }
tc.head = &zookeeperTreeCacheNode{ tc.head = &zookeeperTreeCacheNode{
events: make(chan zk.Event), events: make(chan zk.Event),
children: map[string]*zookeeperTreeCacheNode{}, children: map[string]*zookeeperTreeCacheNode{},
stopped: true, done: make(chan struct{}, 1),
stopped: true, // Set head's stop to be true so that recursiveDelete will not stop the head node.
} }
tc.wg.Add(1)
go tc.loop(path) go tc.loop(path)
return tc return tc
} }
@ -109,9 +114,23 @@ func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTree
// Stop stops the tree cache. // Stop stops the tree cache.
func (tc *ZookeeperTreeCache) Stop() { func (tc *ZookeeperTreeCache) Stop() {
tc.stop <- struct{}{} tc.stop <- struct{}{}
go func() {
// Drain tc.head.events so that go routines can make progress and exit.
for range tc.head.events {
}
}()
go func() {
tc.wg.Wait()
// Close the tc.head.events after all members of the wait group have exited.
// This makes the go routine above exit.
close(tc.head.events)
close(tc.events)
}()
} }
func (tc *ZookeeperTreeCache) loop(path string) { func (tc *ZookeeperTreeCache) loop(path string) {
defer tc.wg.Done()
failureMode := false failureMode := false
retryChan := make(chan struct{}) retryChan := make(chan struct{})
@ -185,6 +204,8 @@ func (tc *ZookeeperTreeCache) loop(path string) {
failureMode = false failureMode = false
} }
case <-tc.stop: case <-tc.stop:
// Stop head as well.
tc.head.done <- struct{}{}
tc.recursiveStop(tc.head) tc.recursiveStop(tc.head)
return return
} }
@ -243,6 +264,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
} }
} }
tc.wg.Add(1)
go func() { go func() {
numWatchers.Inc() numWatchers.Inc()
// Pass up zookeeper events, until the node is deleted. // Pass up zookeeper events, until the node is deleted.
@ -254,6 +276,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
case <-node.done: case <-node.done:
} }
numWatchers.Dec() numWatchers.Dec()
tc.wg.Done()
}() }()
return nil return nil
} }