discovery: sanitize Consul service discovery

This commits simplifies the SD's structure and ensures that all
channel sends are checked against a canceled context.
This commit is contained in:
Fabian Reinartz 2016-04-25 18:40:39 +02:00
parent 5837e6a97f
commit e805e68c01

View file

@ -17,7 +17,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
consul "github.com/hashicorp/consul/api"
@ -57,20 +56,7 @@ type Discovery struct {
clientConf *consul.Config
clientDatacenter string
tagSeparator string
scrapedServices map[string]struct{}
mu sync.RWMutex
services map[string]*consulService
}
// consulService contains data belonging to the same service.
type consulService struct {
name string
tgroup config.TargetGroup
lastIndex uint64
removed bool
running bool
done chan struct{}
scrapedServices map[string]struct{} // Set of services which will be discovered.
}
// NewDiscovery returns a new Discovery for the given config.
@ -94,7 +80,6 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
clientConf: clientConf,
tagSeparator: conf.TagSeparator,
scrapedServices: map[string]struct{}{},
services: map[string]*consulService{},
}
// If the datacenter isn't set in the clientConf, let's get it from the local Consul agent
// (Consul default is to use local node's datacenter if one isn't given for a query).
@ -116,46 +101,10 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
// Run implements the TargetProvider interface.
func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch)
defer cd.stop()
update := make(chan *consulService, 10)
go cd.watchServices(update, ctx.Done())
// Watched services and their cancelation functions.
services := map[string]func(){}
for {
select {
case <-ctx.Done():
return
case srv := <-update:
if srv.removed {
close(srv.done)
// Send clearing update.
ch <- []*config.TargetGroup{{Source: srv.name}}
break
}
// Launch watcher for the service.
if !srv.running {
go cd.watchService(srv, ch)
srv.running = true
}
}
}
}
func (cd *Discovery) stop() {
// The lock prevents Run from terminating while the watchers attempt
// to send on their channels.
cd.mu.Lock()
defer cd.mu.Unlock()
for _, srv := range cd.services {
close(srv.done)
}
}
// watchServices retrieves updates from Consul's services endpoint and sends
// potential updates to the update channel.
func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan struct{}) {
var lastIndex uint64
for {
catalog := cd.client.Catalog()
@ -163,6 +112,15 @@ func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan str
WaitIndex: lastIndex,
WaitTime: watchTimeout,
})
// We have to check the context at least once. The checks during channel sends
// do not guarantee that.
select {
case <-ctx.Done():
return
default:
}
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
time.Sleep(retryInterval)
@ -174,74 +132,100 @@ func (cd *Discovery) watchServices(update chan<- *consulService, done <-chan str
}
lastIndex = meta.LastIndex
cd.mu.Lock()
// Check for new services.
for name := range srvs {
// If no restriction on scraped services is set, we scrape everything.
if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok {
continue
}
if _, ok := services[name]; ok {
continue // We are already watching the service.
}
srv := &consulService{
client: cd.client,
name: name,
labels: model.LabelSet{
serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(cd.clientDatacenter),
},
tagSeparator: cd.tagSeparator,
}
wctx, cancel := context.WithCancel(ctx)
go srv.watch(wctx, ch)
services[name] = cancel
}
// Check for removed services.
for name, cancel := range services {
if _, ok := srvs[name]; !ok {
// Call the watch cancelation function.
cancel()
delete(services, name)
// Send clearing target group.
select {
case <-ctx.Done():
return
case ch <- []*config.TargetGroup{{Source: name}}:
}
}
}
}
}
// consulService contains data belonging to the same service.
type consulService struct {
name string
labels model.LabelSet
client *consul.Client
tagSeparator string
}
func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) {
catalog := srv.client.Catalog()
lastIndex := uint64(0)
for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
WaitIndex: lastIndex,
WaitTime: watchTimeout,
})
// Check the context before potentially falling in a continue-loop.
select {
case <-done:
cd.mu.Unlock()
case <-ctx.Done():
return
default:
// Continue.
}
// Check for new services.
for name := range srvs {
if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) > 0 && !ok {
continue
}
srv, ok := cd.services[name]
if !ok {
srv = &consulService{
name: name,
done: make(chan struct{}),
}
srv.tgroup.Source = name
cd.services[name] = srv
}
srv.tgroup.Labels = model.LabelSet{
serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(cd.clientDatacenter),
}
update <- srv
}
// Check for removed services.
for name, srv := range cd.services {
if _, ok := srvs[name]; !ok {
srv.removed = true
update <- srv
delete(cd.services, name)
}
}
cd.mu.Unlock()
}
}
// watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch.
func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) {
catalog := cd.client.Catalog()
for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
WaitIndex: srv.lastIndex,
WaitTime: watchTimeout,
})
if err != nil {
log.Errorf("Error refreshing service %s: %s", srv.name, err)
time.Sleep(retryInterval)
continue
}
// If the index equals the previous one, the watch timed out with no update.
if meta.LastIndex == srv.lastIndex {
if meta.LastIndex == lastIndex {
continue
}
srv.lastIndex = meta.LastIndex
srv.tgroup.Targets = make([]model.LabelSet, 0, len(nodes))
lastIndex = meta.LastIndex
tgroup := config.TargetGroup{
Source: srv.name,
Labels: srv.labels,
Targets: make([]model.LabelSet, 0, len(nodes)),
}
for _, node := range nodes {
addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort)
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator
srv.tgroup.Targets = append(srv.tgroup.Targets, model.LabelSet{
var (
addr = fmt.Sprintf("%s:%d", node.Address, node.ServicePort)
// We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions.
tags = srv.tagSeparator + strings.Join(node.ServiceTags, srv.tagSeparator) + srv.tagSeparator
)
tgroup.Targets = append(tgroup.Targets, model.LabelSet{
model.AddressLabel: model.LabelValue(addr),
addressLabel: model.LabelValue(node.Address),
nodeLabel: model.LabelValue(node.Node),
@ -251,20 +235,16 @@ func (cd *Discovery) watchService(srv *consulService, ch chan<- []*config.Target
serviceIDLabel: model.LabelValue(node.ServiceID),
})
}
cd.mu.Lock()
// Check context twice to ensure we always catch cancelation.
select {
case <-srv.done:
cd.mu.Unlock()
case <-ctx.Done():
return
default:
// Continue.
}
// TODO(fabxc): do a copy for now to avoid races. The integration
// needs needs some general cleanup.
tg := srv.tgroup
ch <- []*config.TargetGroup{&tg}
cd.mu.Unlock()
select {
case <-ctx.Done():
return
case ch <- []*config.TargetGroup{&tgroup}:
}
}
}