diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 21bdb6b04..6dc8f5162 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -246,6 +246,7 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) { if failureMode { continue } + if ev.Type == zk.EventNotWatching { log.Infof("Lost connection to Zookeeper.") failure() @@ -265,6 +266,7 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) { } node = childNode } + err := tc.recursiveNodeUpdate(ev.Path, node) if err != nil { log.Errorf("Error during processing of Zookeeper event: %s", err) @@ -276,6 +278,8 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) { } case <-retryChan: log.Infof("Attempting to resync state with Zookeeper") + // Reset root child nodes before traversing the Zookeeper path. + tc.head.children = make(map[string]*zookeeperTreeCacheNode) err := tc.recursiveNodeUpdate(tc.prefix, tc.head) if err != nil { log.Errorf("Error during Zookeeper resync: %s", err) @@ -320,19 +324,21 @@ func (tc *zookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr for _, child := range children { currentChildren[child] = struct{}{} childNode := node.children[child] - // Does not already exists, create it. - if childNode == nil { + // Does not already exists or we previous had a watch that + // triggered. + if childNode == nil || childNode.stopped { node.children[child] = &zookeeperTreeCacheNode{ events: node.events, children: map[string]*zookeeperTreeCacheNode{}, done: make(chan struct{}, 1), } - } - err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child]) - if err != nil { - return err + err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child]) + if err != nil { + return err + } } } + // Remove nodes that no longer exist for name, childNode := range node.children { if _, ok := currentChildren[name]; !ok || node.data == nil {