Change TargetProvider interface.

This commit changes the TargetProvider interface to use a
context.Context and send lists of TargetGroups, rather than
single ones.
This commit is contained in:
Fabian Reinartz 2016-02-18 17:26:27 +01:00
parent bb6dc3ff78
commit 5b30bdb610
11 changed files with 141 additions and 263 deletions

View file

@ -15,7 +15,6 @@ package discovery
import ( import (
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -24,6 +23,7 @@ import (
consul "github.com/hashicorp/consul/api" consul "github.com/hashicorp/consul/api"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -113,52 +113,24 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) (*ConsulDiscovery, error) {
return cd, nil return cd, nil
} }
// Sources implements the TargetProvider interface.
func (cd *ConsulDiscovery) Sources() []string {
clientConf := *cd.clientConf
clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second}
client, err := consul.NewClient(&clientConf)
if err != nil {
// NewClient always returns a nil error.
panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err))
}
srvs, _, err := client.Catalog().Services(nil)
if err != nil {
log.Errorf("Error refreshing service list: %s", err)
return nil
}
cd.mu.Lock()
defer cd.mu.Unlock()
srcs := make([]string, 0, len(srvs))
for name := range srvs {
if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) == 0 || ok {
srcs = append(srcs, name)
}
}
return srcs
}
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (cd *ConsulDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
defer cd.stop() defer cd.stop()
update := make(chan *consulService, 10) update := make(chan *consulService, 10)
go cd.watchServices(update, done) go cd.watchServices(update, ctx.Done())
for { for {
select { select {
case <-done: case <-ctx.Done():
return return
case srv := <-update: case srv := <-update:
if srv.removed { if srv.removed {
close(srv.done) close(srv.done)
// Send clearing update. // Send clearing update.
ch <- config.TargetGroup{Source: srv.name} ch <- []*config.TargetGroup{{Source: srv.name}}
break break
} }
// Launch watcher for the service. // Launch watcher for the service.
@ -244,7 +216,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
// watchService retrieves updates about srv from Consul's service endpoint. // watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch. // On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) { func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) {
catalog := cd.client.Catalog() catalog := cd.client.Catalog()
for { for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
@ -288,7 +260,11 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.Tar
default: default:
// Continue. // Continue.
} }
ch <- srv.tgroup // TODO(fabxc): do a copy for now to avoid races. The integration
// needs needs some general cleanup.
tg := srv.tgroup
ch <- []*config.TargetGroup{&tg}
cd.mu.Unlock() cd.mu.Unlock()
} }
} }

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -91,7 +92,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (dd *DNSDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(dd.interval) ticker := time.NewTicker(dd.interval)
@ -104,22 +105,13 @@ func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
select { select {
case <-ticker.C: case <-ticker.C:
dd.refreshAll(ch) dd.refreshAll(ch)
case <-done: case <-ctx.Done():
return return
} }
} }
} }
// Sources implements the TargetProvider interface. func (dd *DNSDiscovery) refreshAll(ch chan<- []*config.TargetGroup) {
func (dd *DNSDiscovery) Sources() []string {
var srcs []string
for _, name := range dd.names {
srcs = append(srcs, name)
}
return srcs
}
func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(dd.names)) wg.Add(len(dd.names))
for _, name := range dd.names { for _, name := range dd.names {
@ -133,7 +125,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
wg.Wait() wg.Wait()
} }
func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error { func (dd *DNSDiscovery) refresh(name string, ch chan<- []*config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype) response, err := lookupAll(name, dd.qtype)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
@ -141,7 +133,8 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error
return err return err
} }
var tg config.TargetGroup tg := &config.TargetGroup{}
for _, record := range response.Answer { for _, record := range response.Answer {
target := model.LabelValue("") target := model.LabelValue("")
switch addr := record.(type) { switch addr := record.(type) {
@ -166,7 +159,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error
} }
tg.Source = name tg.Source = name
ch <- tg ch <- []*config.TargetGroup{tg}
return nil return nil
} }

View file

@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/aws/defaults" "github.com/aws/aws-sdk-go/aws/defaults"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ec2"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -46,7 +47,6 @@ const (
// the TargetProvider interface. // the TargetProvider interface.
type EC2Discovery struct { type EC2Discovery struct {
aws *aws.Config aws *aws.Config
done chan struct{}
interval time.Duration interval time.Duration
port int port int
} }
@ -62,14 +62,13 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
Region: &conf.Region, Region: &conf.Region,
Credentials: creds, Credentials: creds,
}, },
done: make(chan struct{}),
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
port: conf.Port, port: conf.Port,
} }
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(ed.interval) ticker := time.NewTicker(ed.interval)
@ -80,7 +79,7 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- *tg ch <- []*config.TargetGroup{tg}
} }
for { for {
@ -90,19 +89,14 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- *tg ch <- []*config.TargetGroup{tg}
} }
case <-done: case <-ctx.Done():
return return
} }
} }
} }
// Sources implements the TargetProvider interface.
func (ed *EC2Discovery) Sources() []string {
return []string{*ed.aws.Region}
}
func (ed *EC2Discovery) refresh() (*config.TargetGroup, error) { func (ed *EC2Discovery) refresh() (*config.TargetGroup, error) {
ec2s := ec2.New(ed.aws) ec2s := ec2.New(ed.aws)
tg := &config.TargetGroup{ tg := &config.TargetGroup{

View file

@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"gopkg.in/fsnotify.v1" "gopkg.in/fsnotify.v1"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
@ -53,23 +54,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
} }
} }
// Sources implements the TargetProvider interface.
func (fd *FileDiscovery) Sources() []string {
var srcs []string
// As we allow multiple target groups per file we have no choice
// but to parse them all.
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
if err != nil {
log.Errorf("Error reading file %q: %s", p, err)
}
for _, tg := range tgroups {
srcs = append(srcs, tg.Source)
}
}
return srcs
}
// listFiles returns a list of all files that match the configured patterns. // listFiles returns a list of all files that match the configured patterns.
func (fd *FileDiscovery) listFiles() []string { func (fd *FileDiscovery) listFiles() []string {
var paths []string var paths []string
@ -103,7 +87,7 @@ func (fd *FileDiscovery) watchFiles() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
defer fd.stop() defer fd.stop()
@ -123,11 +107,11 @@ func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{})
// Stopping has priority over refreshing. Thus we wrap the actual select // Stopping has priority over refreshing. Thus we wrap the actual select
// clause to always catch done signals. // clause to always catch done signals.
select { select {
case <-done: case <-ctx.Done():
return return
default: default:
select { select {
case <-done: case <-ctx.Done():
return return
case event := <-fd.watcher.Events: case event := <-fd.watcher.Events:
@ -188,7 +172,7 @@ func (fd *FileDiscovery) stop() {
// refresh reads all files matching the discovery's patterns and sends the respective // refresh reads all files matching the discovery's patterns and sends the respective
// updated target groups through the channel. // updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) {
ref := map[string]int{} ref := map[string]int{}
for _, p := range fd.listFiles() { for _, p := range fd.listFiles() {
tgroups, err := readFile(p) tgroups, err := readFile(p)
@ -198,9 +182,8 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
ref[p] = fd.lastRefresh[p] ref[p] = fd.lastRefresh[p]
continue continue
} }
for _, tg := range tgroups { ch <- tgroups
ch <- *tg
}
ref[p] = len(tgroups) ref[p] = len(tgroups)
} }
// Send empty updates for sources that disappeared. // Send empty updates for sources that disappeared.
@ -208,7 +191,9 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
m, ok := ref[f] m, ok := ref[f]
if !ok || n > m { if !ok || n > m {
for i := m; i < n; i++ { for i := m; i < n; i++ {
ch <- config.TargetGroup{Source: fileSource(f, i)} ch <- []*config.TargetGroup{
{Source: fileSource(f, i)},
}
} }
} }
} }

View file

@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
) )
@ -28,16 +29,16 @@ func testFileSD(t *testing.T, ext string) {
var ( var (
fsd = NewFileDiscovery(&conf) fsd = NewFileDiscovery(&conf)
ch = make(chan config.TargetGroup) ch = make(chan []*config.TargetGroup)
done = make(chan struct{}) ctx, cancel = context.WithCancel(context.Background())
) )
go fsd.Run(ch, done) go fsd.Run(ctx, ch)
select { select {
case <-time.After(25 * time.Millisecond): case <-time.After(25 * time.Millisecond):
// Expected. // Expected.
case tg := <-ch: case tgs := <-ch:
t.Fatalf("Unexpected target group in file discovery: %s", tg) t.Fatalf("Unexpected target groups in file discovery: %s", tgs)
} }
newf, err := os.Create("fixtures/_test" + ext) newf, err := os.Create("fixtures/_test" + ext)
@ -58,37 +59,37 @@ func testFileSD(t *testing.T, ext string) {
} }
newf.Close() newf.Close()
// The files contain two target groups which are read and sent in order. // The files contain two target groups.
select { select {
case <-time.After(15 * time.Second): case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none") t.Fatalf("Expected new target group but got none")
case tg := <-ch: case tgs := <-ch:
tg := tgs[0]
if _, ok := tg.Labels["foo"]; !ok { if _, ok := tg.Labels["foo"]; !ok {
t.Fatalf("Label not parsed") t.Fatalf("Label not parsed")
} }
if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) { if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) {
t.Fatalf("Unexpected target group %s", tg) t.Fatalf("Unexpected target group %s", tg)
} }
}
select { tg = tgs[1]
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) { if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) {
t.Fatalf("Unexpected target group %s", tg) t.Fatalf("Unexpected target groups %s", tg)
} }
} }
// Based on unknown circumstances, sometimes fsnotify will trigger more events in // Based on unknown circumstances, sometimes fsnotify will trigger more events in
// some runs (which might be empty, chains of different operations etc.). // some runs (which might be empty, chains of different operations etc.).
// We have to drain those (as the target manager would) to avoid deadlocking and must // We have to drain those (as the target manager would) to avoid deadlocking and must
// not try to make sense of it all... // not try to make sense of it all...
drained := make(chan struct{}) drained := make(chan struct{})
go func() { go func() {
for tg := range ch { for tgs := range ch {
// Below we will change the file to a bad syntax. Previously extracted target // Below we will change the file to a bad syntax. Previously extracted target
// groups must not be deleted via sending an empty target group. // groups must not be deleted via sending an empty target group.
if len(tg.Targets) == 0 { if len(tgs[0].Targets) == 0 {
t.Errorf("Unexpected empty target group received: %s", tg) t.Errorf("Unexpected empty target groups received: %s", tgs)
} }
} }
close(drained) close(drained)
@ -107,6 +108,6 @@ func testFileSD(t *testing.T, ext string) {
os.Rename(newf.Name(), "fixtures/_test"+ext) os.Rename(newf.Name(), "fixtures/_test"+ext)
close(done) cancel()
<-drained <-drained
} }

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil" "github.com/prometheus/prometheus/util/httputil"
@ -94,75 +95,35 @@ func (kd *Discovery) Initialize() error {
return nil return nil
} }
// Sources implements the TargetProvider interface.
func (kd *Discovery) Sources() []string {
sourceNames := make([]string, 0, len(kd.apiServers))
for _, apiServer := range kd.apiServers {
sourceNames = append(sourceNames, apiServersTargetGroupName+":"+apiServer.Host)
}
nodes, _, err := kd.getNodes()
if err != nil {
// If we can't list nodes then we can't watch them. Assume this is a misconfiguration
// & log & return empty.
log.Errorf("Unable to initialize Kubernetes nodes: %s", err)
return []string{}
}
sourceNames = append(sourceNames, kd.nodeSources(nodes)...)
services, _, err := kd.getServices()
if err != nil {
// If we can't list services then we can't watch them. Assume this is a misconfiguration
// & log & return empty.
log.Errorf("Unable to initialize Kubernetes services: %s", err)
return []string{}
}
sourceNames = append(sourceNames, kd.serviceSources(services)...)
return sourceNames
}
func (kd *Discovery) nodeSources(nodes map[string]*Node) []string {
var sourceNames []string
for name := range nodes {
sourceNames = append(sourceNames, nodesTargetGroupName+":"+name)
}
return sourceNames
}
func (kd *Discovery) serviceSources(services map[string]map[string]*Service) []string {
var sourceNames []string
for _, ns := range services {
for _, service := range ns {
sourceNames = append(sourceNames, serviceSource(service))
}
}
return sourceNames
}
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
if tg := kd.updateAPIServersTargetGroup(); tg != nil { // Send an initial full view.
// TODO(fabxc): this does not include all available services and service
// endpoints yet. Service endpoints were also missing in the previous Sources() method.
var all []*config.TargetGroup
all = append(all, kd.updateAPIServersTargetGroup())
all = append(all, kd.updateNodesTargetGroup())
select { select {
case ch <- *tg: case ch <- all:
case <-done: case <-ctx.Done():
return return
} }
}
retryInterval := time.Duration(kd.Conf.RetryInterval) retryInterval := time.Duration(kd.Conf.RetryInterval)
update := make(chan interface{}, 10) update := make(chan interface{}, 10)
go kd.watchNodes(update, done, retryInterval) go kd.watchNodes(update, ctx.Done(), retryInterval)
go kd.startServiceWatch(update, done, retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval)
var tg *config.TargetGroup var tg *config.TargetGroup
for { for {
select { select {
case <-done: case <-ctx.Done():
return return
case event := <-update: case event := <-update:
switch obj := event.(type) { switch obj := event.(type) {
@ -181,8 +142,8 @@ func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
} }
select { select {
case ch <- *tg: case ch <- []*config.TargetGroup{tg}:
case <-done: case <-ctx.Done():
return return
} }
} }

View file

@ -17,6 +17,8 @@ import (
"time" "time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon" "github.com/prometheus/prometheus/retrieval/discovery/marathon"
) )
@ -40,25 +42,13 @@ func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery {
} }
} }
// Sources implements the TargetProvider interface.
func (md *MarathonDiscovery) Sources() []string {
var sources []string
tgroups, err := md.fetchTargetGroups()
if err == nil {
for source := range tgroups {
sources = append(sources, source)
}
}
return sources
}
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (md *MarathonDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) defer close(ch)
for { for {
select { select {
case <-done: case <-ctx.Done():
return return
case <-time.After(md.refreshInterval): case <-time.After(md.refreshInterval):
err := md.updateServices(ch) err := md.updateServices(ch)
@ -69,23 +59,24 @@ func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struc
} }
} }
func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error { func (md *MarathonDiscovery) updateServices(ch chan<- []*config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups() targetMap, err := md.fetchTargetGroups()
if err != nil { if err != nil {
return err return err
} }
// Update services which are still present all := make([]*config.TargetGroup, 0, len(targetMap))
for _, tg := range targetMap { for _, tg := range targetMap {
ch <- *tg all = append(all, tg)
} }
ch <- all
// Remove services which did disappear // Remove services which did disappear
for source := range md.lastRefresh { for source := range md.lastRefresh {
_, ok := targetMap[source] _, ok := targetMap[source]
if !ok { if !ok {
log.Debugf("Removing group for %s", source) log.Debugf("Removing group for %s", source)
ch <- config.TargetGroup{Source: source} ch <- []*config.TargetGroup{{Source: source}}
} }
} }

View file

@ -19,6 +19,7 @@ import (
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon" "github.com/prometheus/prometheus/retrieval/discovery/marathon"
@ -26,8 +27,8 @@ import (
var marathonValidLabel = map[string]string{"prometheus": "yes"} var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) { func newTestDiscovery(client marathon.AppListClient) (chan []*config.TargetGroup, *MarathonDiscovery) {
ch := make(chan config.TargetGroup) ch := make(chan []*config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{ md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"}, Servers: []string{"http://localhost:8080"},
}) })
@ -60,7 +61,9 @@ func TestMarathonSDEmptyList(t *testing.T) {
go func() { go func() {
select { select {
case tg := <-ch: case tg := <-ch:
if len(tg) > 0 {
t.Fatalf("Got group: %v", tg) t.Fatalf("Got group: %v", tg)
}
default: default:
} }
}() }()
@ -96,7 +99,9 @@ func TestMarathonSDSendGroup(t *testing.T) {
}) })
go func() { go func() {
select { select {
case tg := <-ch: case tgs := <-ch:
tg := tgs[0]
if tg.Source != "test-service" { if tg.Source != "test-service" {
t.Fatalf("Wrong target group name: %s", tg.Source) t.Fatalf("Wrong target group name: %s", tg.Source)
} }
@ -121,9 +126,10 @@ func TestMarathonSDRemoveApp(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
}) })
go func() { go func() {
up1 := <-ch up1 := (<-ch)[0]
up2 := <-ch up2 := (<-ch)[0]
if up2.Source != up1.Source { if up2.Source != up1.Source {
t.Fatalf("Source is different: %s", up2) t.Fatalf("Source is different: %s", up2)
if len(up2.Targets) > 0 { if len(up2.Targets) > 0 {
@ -145,33 +151,25 @@ func TestMarathonSDRemoveApp(t *testing.T) {
} }
} }
func TestMarathonSDSources(t *testing.T) {
_, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
sources := md.Sources()
if len(sources) != 1 {
t.Fatalf("Wrong number of sources: %s", sources)
}
}
func TestMarathonSDRunAndStop(t *testing.T) { func TestMarathonSDRunAndStop(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
}) })
md.refreshInterval = time.Millisecond * 10 md.refreshInterval = time.Millisecond * 10
done := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
select { select {
case <-ch: case <-ch:
close(done) cancel()
case <-time.After(md.refreshInterval * 3): case <-time.After(md.refreshInterval * 3):
close(done) cancel()
t.Fatalf("Update took too long.") t.Fatalf("Update took too long.")
} }
}() }()
md.Run(ch, done)
md.Run(ctx, ch)
select { select {
case <-ch: case <-ch:
default: default:

View file

@ -21,6 +21,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/treecache" "github.com/prometheus/prometheus/util/treecache"
@ -47,7 +48,7 @@ type NerveDiscovery struct {
conn *zk.Conn conn *zk.Conn
mu sync.RWMutex mu sync.RWMutex
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- config.TargetGroup sdUpdates *chan<- []*config.TargetGroup
updates chan treecache.ZookeeperTreeCacheEvent updates chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache treeCaches []*treecache.ZookeeperTreeCache
} }
@ -73,17 +74,6 @@ func NewNerveDiscovery(conf *config.NerveSDConfig) *NerveDiscovery {
return sd return sd
} }
// Sources implements the TargetProvider interface.
func (sd *NerveDiscovery) Sources() []string {
sd.mu.RLock()
defer sd.mu.RUnlock()
srcs := []string{}
for t := range sd.sources {
srcs = append(srcs, t)
}
return srcs
}
func (sd *NerveDiscovery) processUpdates() { func (sd *NerveDiscovery) processUpdates() {
defer sd.conn.Close() defer sd.conn.Close()
for event := range sd.updates { for event := range sd.updates {
@ -104,7 +94,7 @@ func (sd *NerveDiscovery) processUpdates() {
} }
sd.mu.Unlock() sd.mu.Unlock()
if sd.sdUpdates != nil { if sd.sdUpdates != nil {
*sd.sdUpdates <- *tg *sd.sdUpdates <- []*config.TargetGroup{tg}
} }
} }
@ -114,17 +104,22 @@ func (sd *NerveDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *NerveDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (sd *NerveDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- *targetGroup all := make([]*config.TargetGroup, 0, len(sd.sources))
for _, tg := range sd.sources {
all = append(all, tg)
} }
ch <- all
// Tell processUpdates to send future updates. // Tell processUpdates to send future updates.
sd.sdUpdates = &ch sd.sdUpdates = &ch
sd.mu.Unlock() sd.mu.Unlock()
<-done <-ctx.Done()
for _, tc := range sd.treeCaches { for _, tc := range sd.treeCaches {
tc.Stop() tc.Stop()
} }

View file

@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/samuel/go-zookeeper/zk" "github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/strutil"
@ -57,7 +58,7 @@ type ServersetDiscovery struct {
conn *zk.Conn conn *zk.Conn
mu sync.RWMutex mu sync.RWMutex
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- config.TargetGroup sdUpdates *chan<- []*config.TargetGroup
updates chan treecache.ZookeeperTreeCacheEvent updates chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache treeCaches []*treecache.ZookeeperTreeCache
} }
@ -83,17 +84,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery {
return sd return sd
} }
// Sources implements the TargetProvider interface.
func (sd *ServersetDiscovery) Sources() []string {
sd.mu.RLock()
defer sd.mu.RUnlock()
srcs := []string{}
for t := range sd.sources {
srcs = append(srcs, t)
}
return srcs
}
func (sd *ServersetDiscovery) processUpdates() { func (sd *ServersetDiscovery) processUpdates() {
defer sd.conn.Close() defer sd.conn.Close()
for event := range sd.updates { for event := range sd.updates {
@ -114,7 +104,7 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
sd.mu.Unlock() sd.mu.Unlock()
if sd.sdUpdates != nil { if sd.sdUpdates != nil {
*sd.sdUpdates <- *tg *sd.sdUpdates <- []*config.TargetGroup{tg}
} }
} }
@ -124,17 +114,22 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (sd *ServersetDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- *targetGroup all := make([]*config.TargetGroup, 0, len(sd.sources))
for _, tg := range sd.sources {
all = append(all, tg)
} }
ch <- all
// Tell processUpdates to send future updates. // Tell processUpdates to send future updates.
sd.sdUpdates = &ch sd.sdUpdates = &ch
sd.mu.Unlock() sd.mu.Unlock()
<-done <-ctx.Done()
for _, tc := range sd.treeCaches { for _, tc := range sd.treeCaches {
tc.Stop() tc.Stop()
} }
@ -142,8 +137,8 @@ func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan stru
func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { func parseServersetMember(data []byte, path string) (*model.LabelSet, error) {
member := serversetMember{} member := serversetMember{}
err := json.Unmarshal(data, &member)
if err != nil { if err := json.Unmarshal(data, &member); err != nil {
return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err) return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err)
} }

View file

@ -40,7 +40,7 @@ type TargetProvider interface {
// updated target groups. The channel must be closed by the target provider // updated target groups. The channel must be closed by the target provider
// if no more updates will be sent. // if no more updates will be sent.
// On receiving from done Run must return. // On receiving from done Run must return.
Run(up chan<- config.TargetGroup, done <-chan struct{}) Run(ctx context.Context, up chan<- []*config.TargetGroup)
} }
// TargetManager maintains a set of targets, starts and stops their scraping and // TargetManager maintains a set of targets, starts and stops their scraping and
@ -178,7 +178,7 @@ func (ss *scrapeSet) run(ctx context.Context) {
for name, prov := range providers { for name, prov := range providers {
var ( var (
updates = make(chan config.TargetGroup) updates = make(chan []*config.TargetGroup)
) )
wg.Add(1) wg.Add(1)
@ -192,23 +192,17 @@ func (ss *scrapeSet) run(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
ss.stopScrapers(name) ss.stopScrapers(name)
return return
case update := <-updates: case tgs := <-updates:
if err := ss.update(name, &update); err != nil { for _, tg := range tgs {
log.With("target_group", update).Errorf("Target update failed: %s", err) if err := ss.update(name, tg); err != nil {
log.With("target_group", tg).Errorf("Target update failed: %s", err)
}
} }
} }
} }
}(name, prov) }(name, prov)
done := make(chan struct{}) go prov.Run(ctx, updates)
// TODO(fabxc): Adjust the TargetProvider interface so we can remove this
// redirection of the termination signal.
go func() {
<-ctx.Done()
close(done)
}()
go prov.Run(updates, done)
} }
wg.Wait() wg.Wait()
@ -421,21 +415,16 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups { for i, tg := range groups {
tg.Source = fmt.Sprintf("%d", i) tg.Source = fmt.Sprintf("%d", i)
} }
return &StaticProvider{ return &StaticProvider{groups}
TargetGroups: groups,
}
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer close(ch) // We still have to consider that the consumer exits right away in which case
// the context will be canceled.
for _, tg := range sd.TargetGroups {
select { select {
case <-done: case ch <- sd.TargetGroups:
return case <-ctx.Done():
case ch <- *tg:
} }
} close(ch)
<-done
} }