skip already watched child nodes.

This commit is contained in:
Tommy Ulfsparre 2015-11-16 07:25:54 +01:00
parent 7a6a0630d1
commit 83e09422bf

View file

@ -246,6 +246,7 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) {
if failureMode { if failureMode {
continue continue
} }
if ev.Type == zk.EventNotWatching { if ev.Type == zk.EventNotWatching {
log.Infof("Lost connection to Zookeeper.") log.Infof("Lost connection to Zookeeper.")
failure() failure()
@ -265,6 +266,7 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) {
} }
node = childNode node = childNode
} }
err := tc.recursiveNodeUpdate(ev.Path, node) err := tc.recursiveNodeUpdate(ev.Path, node)
if err != nil { if err != nil {
log.Errorf("Error during processing of Zookeeper event: %s", err) log.Errorf("Error during processing of Zookeeper event: %s", err)
@ -276,6 +278,8 @@ func (tc *zookeeperTreeCache) loop(failureMode bool) {
} }
case <-retryChan: case <-retryChan:
log.Infof("Attempting to resync state with Zookeeper") log.Infof("Attempting to resync state with Zookeeper")
// Reset root child nodes before traversing the Zookeeper path.
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
err := tc.recursiveNodeUpdate(tc.prefix, tc.head) err := tc.recursiveNodeUpdate(tc.prefix, tc.head)
if err != nil { if err != nil {
log.Errorf("Error during Zookeeper resync: %s", err) log.Errorf("Error during Zookeeper resync: %s", err)
@ -320,19 +324,21 @@ func (tc *zookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
for _, child := range children { for _, child := range children {
currentChildren[child] = struct{}{} currentChildren[child] = struct{}{}
childNode := node.children[child] childNode := node.children[child]
// Does not already exists, create it. // Does not already exists or we previous had a watch that
if childNode == nil { // triggered.
if childNode == nil || childNode.stopped {
node.children[child] = &zookeeperTreeCacheNode{ node.children[child] = &zookeeperTreeCacheNode{
events: node.events, events: node.events,
children: map[string]*zookeeperTreeCacheNode{}, children: map[string]*zookeeperTreeCacheNode{},
done: make(chan struct{}, 1), done: make(chan struct{}, 1),
} }
} err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child])
err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child]) if err != nil {
if err != nil { return err
return err }
} }
} }
// Remove nodes that no longer exist // Remove nodes that no longer exist
for name, childNode := range node.children { for name, childNode := range node.children {
if _, ok := currentChildren[name]; !ok || node.data == nil { if _, ok := currentChildren[name]; !ok || node.data == nil {