From 9229811c943ab69bd2d6dfdf76568c0efff30f18 Mon Sep 17 00:00:00 2001 From: Ye Ji Date: Tue, 9 Jul 2019 16:18:30 -0400 Subject: [PATCH] give each tree cache its unique channel to avoid multiple close on the same channel Signed-off-by: Ye Ji --- discovery/zookeeper/zookeeper.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index ee2785721..019af105f 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -108,8 +108,9 @@ type Discovery struct { sources map[string]*targetgroup.Group - updates chan treecache.ZookeeperTreeCacheEvent - treeCaches []*treecache.ZookeeperTreeCache + updates chan treecache.ZookeeperTreeCacheEvent + pathUpdates []chan treecache.ZookeeperTreeCacheEvent + treeCaches []*treecache.ZookeeperTreeCache parse func(data []byte, path string) (model.LabelSet, error) logger log.Logger @@ -155,7 +156,9 @@ func NewDiscovery( logger: logger, } for _, path := range paths { - sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger)) + pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent) + sd.pathUpdates = append(sd.pathUpdates, pathUpdate) + sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger)) } return sd, nil } @@ -166,12 +169,26 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { for _, tc := range d.treeCaches { tc.Stop() } - // Drain event channel in case the treecache leaks goroutines otherwise. - for range d.updates { + for _, pathUpdate := range d.pathUpdates { + // Drain event channel in case the treecache leaks goroutines otherwise. + for range pathUpdate { + } } d.conn.Close() }() + for _, pathUpdate := range d.pathUpdates { + go func(update chan treecache.ZookeeperTreeCacheEvent) { + for event := range update { + select { + case d.updates <- event: + case <-ctx.Done(): + return + } + } + }(pathUpdate) + } + for { select { case <-ctx.Done():