mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Merge pull request #5749 from meridional/fix-close
Fix potential panic when prometheus is watching multiple zookeeper paths
This commit is contained in:
commit
dd598f906d
|
@ -108,8 +108,9 @@ type Discovery struct {
|
||||||
|
|
||||||
sources map[string]*targetgroup.Group
|
sources map[string]*targetgroup.Group
|
||||||
|
|
||||||
updates chan treecache.ZookeeperTreeCacheEvent
|
updates chan treecache.ZookeeperTreeCacheEvent
|
||||||
treeCaches []*treecache.ZookeeperTreeCache
|
pathUpdates []chan treecache.ZookeeperTreeCacheEvent
|
||||||
|
treeCaches []*treecache.ZookeeperTreeCache
|
||||||
|
|
||||||
parse func(data []byte, path string) (model.LabelSet, error)
|
parse func(data []byte, path string) (model.LabelSet, error)
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
@ -155,7 +156,9 @@ func NewDiscovery(
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
for _, path := range paths {
|
for _, path := range paths {
|
||||||
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger))
|
pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent)
|
||||||
|
sd.pathUpdates = append(sd.pathUpdates, pathUpdate)
|
||||||
|
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger))
|
||||||
}
|
}
|
||||||
return sd, nil
|
return sd, nil
|
||||||
}
|
}
|
||||||
|
@ -166,12 +169,26 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
||||||
for _, tc := range d.treeCaches {
|
for _, tc := range d.treeCaches {
|
||||||
tc.Stop()
|
tc.Stop()
|
||||||
}
|
}
|
||||||
// Drain event channel in case the treecache leaks goroutines otherwise.
|
for _, pathUpdate := range d.pathUpdates {
|
||||||
for range d.updates {
|
// Drain event channel in case the treecache leaks goroutines otherwise.
|
||||||
|
for range pathUpdate {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
d.conn.Close()
|
d.conn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
for _, pathUpdate := range d.pathUpdates {
|
||||||
|
go func(update chan treecache.ZookeeperTreeCacheEvent) {
|
||||||
|
for event := range update {
|
||||||
|
select {
|
||||||
|
case d.updates <- event:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(pathUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
Loading…
Reference in a new issue