diff --git a/util/treecache/treecache.go b/util/treecache/treecache.go index 15ad4a7b3..19111e526 100644 --- a/util/treecache/treecache.go +++ b/util/treecache/treecache.go @@ -17,6 +17,7 @@ import ( "bytes" "fmt" "strings" + "sync" "time" "github.com/go-kit/kit/log" @@ -68,6 +69,7 @@ type ZookeeperTreeCache struct { prefix string events chan ZookeeperTreeCacheEvent stop chan struct{} + wg *sync.WaitGroup head *zookeeperTreeCacheNode logger log.Logger @@ -94,14 +96,17 @@ func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTree prefix: path, events: events, stop: make(chan struct{}), + wg: &sync.WaitGroup{}, logger: logger, } tc.head = &zookeeperTreeCacheNode{ events: make(chan zk.Event), children: map[string]*zookeeperTreeCacheNode{}, - stopped: true, + done: make(chan struct{}, 1), + stopped: true, // Set head's stop to be true so that recursiveDelete will not stop the head node. } + tc.wg.Add(1) go tc.loop(path) return tc } @@ -109,9 +114,23 @@ func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTree // Stop stops the tree cache. func (tc *ZookeeperTreeCache) Stop() { tc.stop <- struct{}{} + go func() { + // Drain tc.head.events so that go routines can make progress and exit. + for range tc.head.events { + } + }() + go func() { + tc.wg.Wait() + // Close the tc.head.events after all members of the wait group have exited. + // This makes the go routine above exit. + close(tc.head.events) + close(tc.events) + }() } func (tc *ZookeeperTreeCache) loop(path string) { + defer tc.wg.Done() + failureMode := false retryChan := make(chan struct{}) @@ -185,6 +204,8 @@ func (tc *ZookeeperTreeCache) loop(path string) { failureMode = false } case <-tc.stop: + // Stop head as well. + tc.head.done <- struct{}{} tc.recursiveStop(tc.head) return } @@ -243,6 +264,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr } } + tc.wg.Add(1) go func() { numWatchers.Inc() // Pass up zookeeper events, until the node is deleted. @@ -254,6 +276,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr case <-node.done: } numWatchers.Dec() + tc.wg.Done() }() return nil }