mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-28 23:19:41 -08:00
Merge pull request #1985 from tommyulfsparre/resync
Resync state after ZooKeeper failure
This commit is contained in:
commit
5ac89fbc1f
|
@ -19,10 +19,31 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/samuel/go-zookeeper/zk"
|
"github.com/samuel/go-zookeeper/zk"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
failureCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "prometheus",
|
||||||
|
Subsystem: "treecache",
|
||||||
|
Name: "zookeeper_failures_total",
|
||||||
|
Help: "The total number of ZooKeeper failures.",
|
||||||
|
})
|
||||||
|
numWatchers = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Namespace: "prometheus",
|
||||||
|
Subsystem: "treecache",
|
||||||
|
Name: "watcher_goroutines",
|
||||||
|
Help: "The current number of watcher goroutines.",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(failureCounter)
|
||||||
|
prometheus.MustRegister(numWatchers)
|
||||||
|
}
|
||||||
|
|
||||||
type ZookeeperLogger struct {
|
type ZookeeperLogger struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +102,7 @@ func (tc *ZookeeperTreeCache) loop(failureMode bool) {
|
||||||
retryChan := make(chan struct{})
|
retryChan := make(chan struct{})
|
||||||
|
|
||||||
failure := func() {
|
failure := func() {
|
||||||
|
failureCounter.Inc()
|
||||||
failureMode = true
|
failureMode = true
|
||||||
time.AfterFunc(time.Second*10, func() {
|
time.AfterFunc(time.Second*10, func() {
|
||||||
retryChan <- struct{}{}
|
retryChan <- struct{}{}
|
||||||
|
@ -129,13 +151,19 @@ func (tc *ZookeeperTreeCache) loop(failureMode bool) {
|
||||||
}
|
}
|
||||||
case <-retryChan:
|
case <-retryChan:
|
||||||
log.Infof("Attempting to resync state with Zookeeper")
|
log.Infof("Attempting to resync state with Zookeeper")
|
||||||
|
previousState := &zookeeperTreeCacheNode{
|
||||||
|
children: tc.head.children,
|
||||||
|
}
|
||||||
// Reset root child nodes before traversing the Zookeeper path.
|
// Reset root child nodes before traversing the Zookeeper path.
|
||||||
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
|
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
|
||||||
err := tc.recursiveNodeUpdate(tc.prefix, tc.head)
|
|
||||||
if err != nil {
|
if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil {
|
||||||
log.Errorf("Error during Zookeeper resync: %s", err)
|
log.Errorf("Error during Zookeeper resync: %s", err)
|
||||||
|
// Revert to our previous state.
|
||||||
|
tc.head.children = previousState.children
|
||||||
failure()
|
failure()
|
||||||
} else {
|
} else {
|
||||||
|
tc.resyncState(tc.prefix, tc.head, previousState)
|
||||||
log.Infof("Zookeeper resync successful")
|
log.Infof("Zookeeper resync successful")
|
||||||
failureMode = false
|
failureMode = false
|
||||||
}
|
}
|
||||||
|
@ -199,6 +227,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
numWatchers.Inc()
|
||||||
// Pass up zookeeper events, until the node is deleted.
|
// Pass up zookeeper events, until the node is deleted.
|
||||||
select {
|
select {
|
||||||
case event := <-dataWatcher:
|
case event := <-dataWatcher:
|
||||||
|
@ -207,10 +236,21 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
||||||
node.events <- event
|
node.events <- event
|
||||||
case <-node.done:
|
case <-node.done:
|
||||||
}
|
}
|
||||||
|
numWatchers.Dec()
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tc *ZookeeperTreeCache) resyncState(path string, currentState, previousState *zookeeperTreeCacheNode) {
|
||||||
|
for child, previousNode := range previousState.children {
|
||||||
|
if currentNode, present := currentState.children[child]; present {
|
||||||
|
tc.resyncState(path+"/"+child, currentNode, previousNode)
|
||||||
|
} else {
|
||||||
|
tc.recursiveDelete(path+"/"+child, previousNode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) {
|
func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) {
|
||||||
if !node.stopped {
|
if !node.stopped {
|
||||||
node.done <- struct{}{}
|
node.done <- struct{}{}
|
||||||
|
|
Loading…
Reference in a new issue