Fix SD mechanism source prefix handling.

The prefixed target provider changed a pointerized target group that was
reused in the wrapped target provider, causing an ever-increasing chain
of source prefixes in target groups from the Consul target provider.

We now make this bug generally impossible by switching the target group
channel from pointer to value type and thus ensuring that target groups
are copied before being passed on to other parts of the system.

I tried to not let the depointerization leak too far outside of the
channel handling (both upstream and downstream) because I tried that
initially and caused some nasty bugs, which I want to minimize.

Fixes https://github.com/prometheus/prometheus/issues/1083
This commit is contained in:
Julius Volz 2015-10-08 18:06:58 +02:00
parent 0088aa4d45
commit d88aea7e6f
12 changed files with 67 additions and 63 deletions

View file

@ -66,7 +66,7 @@ type ConsulDiscovery struct {
// consulService contains data belonging to the same service. // consulService contains data belonging to the same service.
type consulService struct { type consulService struct {
name string name string
tgroup *config.TargetGroup tgroup config.TargetGroup
lastIndex uint64 lastIndex uint64
removed bool removed bool
running bool running bool
@ -143,7 +143,7 @@ func (cd *ConsulDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
defer cd.stop() defer cd.stop()
@ -159,7 +159,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct
close(srv.done) close(srv.done)
// Send clearing update. // Send clearing update.
ch <- &config.TargetGroup{Source: srv.name} ch <- config.TargetGroup{Source: srv.name}
break break
} }
// Launch watcher for the service. // Launch watcher for the service.
@ -219,9 +219,8 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
srv, ok := cd.services[name] srv, ok := cd.services[name]
if !ok { if !ok {
srv = &consulService{ srv = &consulService{
name: name, name: name,
tgroup: &config.TargetGroup{}, done: make(chan struct{}),
done: make(chan struct{}),
} }
srv.tgroup.Source = name srv.tgroup.Source = name
cd.services[name] = srv cd.services[name] = srv
@ -246,7 +245,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
// watchService retrieves updates about srv from Consul's service endpoint. // watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch. // On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) { func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) {
catalog := cd.client.Catalog() catalog := cd.client.Catalog()
for { for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{

View file

@ -91,7 +91,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(dd.interval) ticker := time.NewTicker(dd.interval)
@ -119,7 +119,7 @@ func (dd *DNSDiscovery) Sources() []string {
return srcs return srcs
} }
func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) { func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(dd.names)) wg.Add(len(dd.names))
for _, name := range dd.names { for _, name := range dd.names {
@ -133,7 +133,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) {
wg.Wait() wg.Wait()
} }
func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) error { func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype) response, err := lookupAll(name, dd.qtype)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
@ -141,7 +141,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro
return err return err
} }
tg := &config.TargetGroup{} var tg config.TargetGroup
for _, record := range response.Answer { for _, record := range response.Answer {
target := model.LabelValue("") target := model.LabelValue("")
switch addr := record.(type) { switch addr := record.(type) {

View file

@ -62,7 +62,7 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ticker := time.NewTicker(ed.interval) ticker := time.NewTicker(ed.interval)
@ -73,7 +73,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- tg ch <- *tg
} }
for { for {
@ -83,7 +83,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{})
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
ch <- tg ch <- *tg
} }
case <-done: case <-done:
return return

View file

@ -103,7 +103,7 @@ func (fd *FileDiscovery) watchFiles() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
defer fd.stop() defer fd.stop()
@ -188,7 +188,7 @@ func (fd *FileDiscovery) stop() {
// refresh reads all files matching the discovery's patterns and sends the respective // refresh reads all files matching the discovery's patterns and sends the respective
// updated target groups through the channel. // updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
ref := map[string]int{} ref := map[string]int{}
for _, p := range fd.listFiles() { for _, p := range fd.listFiles() {
tgroups, err := readFile(p) tgroups, err := readFile(p)
@ -199,7 +199,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
continue continue
} }
for _, tg := range tgroups { for _, tg := range tgroups {
ch <- tg ch <- *tg
} }
ref[p] = len(tgroups) ref[p] = len(tgroups)
} }
@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
m, ok := ref[f] m, ok := ref[f]
if !ok || n > m { if !ok || n > m {
for i := m; i < n; i++ { for i := m; i < n; i++ {
ch <- &config.TargetGroup{Source: fileSource(f, i)} ch <- config.TargetGroup{Source: fileSource(f, i)}
} }
} }
} }

View file

@ -26,7 +26,7 @@ func testFileSD(t *testing.T, ext string) {
var ( var (
fsd = NewFileDiscovery(&conf) fsd = NewFileDiscovery(&conf)
ch = make(chan *config.TargetGroup) ch = make(chan config.TargetGroup)
done = make(chan struct{}) done = make(chan struct{})
) )
go fsd.Run(ch, done) go fsd.Run(ch, done)

View file

@ -173,25 +173,35 @@ func (kd *Discovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
select { if tg := kd.updateMastersTargetGroup(); tg != nil {
case ch <- kd.updateMastersTargetGroup(): select {
case <-done: case ch <- *tg:
return case <-done:
return
}
} }
select { if tg := kd.updateNodesTargetGroup(); tg != nil {
case ch <- kd.updateNodesTargetGroup(): select {
case <-done: case ch <- *tg:
return case <-done:
return
}
} }
for _, ns := range kd.services { for _, ns := range kd.services {
for _, service := range ns { for _, service := range ns {
tg := kd.addService(service)
if tg == nil {
continue
}
select { select {
case ch <- kd.addService(service): case ch <- *tg:
case <-done: case <-done:
return return
} }
@ -223,8 +233,12 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
} }
} }
if tg == nil {
continue
}
select { select {
case ch <- tg: case ch <- *tg:
case <-done: case <-done:
return return
} }

View file

@ -53,7 +53,7 @@ func (md *MarathonDiscovery) Sources() []string {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for { for {
@ -69,7 +69,7 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan stru
} }
} }
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups() targetMap, err := md.fetchTargetGroups()
if err != nil { if err != nil {
return err return err
@ -77,7 +77,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error
// Update services which are still present // Update services which are still present
for _, tg := range targetMap { for _, tg := range targetMap {
ch <- tg ch <- *tg
} }
// Remove services which did disappear // Remove services which did disappear
@ -85,7 +85,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error
_, ok := targetMap[source] _, ok := targetMap[source]
if !ok { if !ok {
log.Debugf("Removing group for %s", source) log.Debugf("Removing group for %s", source)
ch <- &config.TargetGroup{Source: source} ch <- config.TargetGroup{Source: source}
} }
} }

View file

@ -26,8 +26,8 @@ import (
var marathonValidLabel = map[string]string{"prometheus": "yes"} var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) { func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) {
ch := make(chan *config.TargetGroup) ch := make(chan config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{ md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"}, Servers: []string{"http://localhost:8080"},
}) })

View file

@ -67,7 +67,7 @@ type ServersetDiscovery struct {
conn *zk.Conn conn *zk.Conn
mu sync.RWMutex mu sync.RWMutex
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup sdUpdates *chan<- config.TargetGroup
updates chan zookeeperTreeCacheEvent updates chan zookeeperTreeCacheEvent
treeCaches []*zookeeperTreeCache treeCaches []*zookeeperTreeCache
} }
@ -124,7 +124,7 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
sd.mu.Unlock() sd.mu.Unlock()
if sd.sdUpdates != nil { if sd.sdUpdates != nil {
*sd.sdUpdates <- tg *sd.sdUpdates <- *tg
} }
} }
@ -134,11 +134,11 @@ func (sd *ServersetDiscovery) processUpdates() {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
// Send on everything we have seen so far. // Send on everything we have seen so far.
sd.mu.Lock() sd.mu.Lock()
for _, targetGroup := range sd.sources { for _, targetGroup := range sd.sources {
ch <- targetGroup ch <- *targetGroup
} }
// Tell processUpdates to send future updates. // Tell processUpdates to send future updates.
sd.sdUpdates = &ch sd.sdUpdates = &ch

View file

@ -52,12 +52,12 @@ type fakeTargetProvider struct {
update chan *config.TargetGroup update chan *config.TargetGroup
} }
func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (tp *fakeTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for { for {
select { select {
case tg := <-tp.update: case tg := <-tp.update:
ch <- tg ch <- *tg
case <-done: case <-done:
return return
} }

View file

@ -43,7 +43,7 @@ type TargetProvider interface {
// updated target groups. The channel must be closed by the target provider // updated target groups. The channel must be closed by the target provider
// if no more updates will be sent. // if no more updates will be sent.
// On receiving from done Run must return. // On receiving from done Run must return.
Run(up chan<- *config.TargetGroup, done <-chan struct{}) Run(up chan<- config.TargetGroup, done <-chan struct{})
} }
// TargetManager maintains a set of targets, starts and stops their scraping and // TargetManager maintains a set of targets, starts and stops their scraping and
@ -105,7 +105,7 @@ func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGr
// targetGroupUpdate is a potentially changed/new target group // targetGroupUpdate is a potentially changed/new target group
// for the given scrape configuration. // for the given scrape configuration.
type targetGroupUpdate struct { type targetGroupUpdate struct {
tg *config.TargetGroup tg config.TargetGroup
scfg *config.ScrapeConfig scfg *config.ScrapeConfig
} }
@ -126,9 +126,9 @@ func (tm *TargetManager) Run() {
sources[src] = struct{}{} sources[src] = struct{}{}
} }
tgc := make(chan *config.TargetGroup) tgc := make(chan config.TargetGroup)
// Run the target provider after cleanup of the stale targets is done. // Run the target provider after cleanup of the stale targets is done.
defer func(prov TargetProvider, tgc chan<- *config.TargetGroup, done <-chan struct{}) { defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) {
go prov.Run(tgc, done) go prov.Run(tgc, done)
}(prov, tgc, tm.done) }(prov, tgc, tm.done)
@ -140,9 +140,6 @@ func (tm *TargetManager) Run() {
for { for {
select { select {
case tg := <-tgc: case tg := <-tgc:
if tg == nil {
break
}
tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} tgupc <- targetGroupUpdate{tg: tg, scfg: scfg}
case <-done: case <-done:
return return
@ -179,12 +176,9 @@ func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan
if !ok { if !ok {
return return
} }
if update.tg == nil {
break
}
log.Debugf("Received potential update for target group %q", update.tg.Source) log.Debugf("Received potential update for target group %q", update.tg.Source)
if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil { if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil {
log.Errorf("Error updating targets: %s", err) log.Errorf("Error updating targets: %s", err)
} }
case <-done: case <-done:
@ -382,10 +376,10 @@ func (tp *prefixedTargetProvider) Sources() []string {
return srcs return srcs
} }
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
ch2 := make(chan *config.TargetGroup) ch2 := make(chan config.TargetGroup)
go tp.TargetProvider.Run(ch2, done) go tp.TargetProvider.Run(ch2, done)
for { for {
@ -393,9 +387,6 @@ func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan
case <-done: case <-done:
return return
case tg := <-ch2: case tg := <-ch2:
if tg == nil {
break
}
tg.Source = tp.prefix(tg.Source) tg.Source = tp.prefix(tg.Source)
ch <- tg ch <- tg
} }
@ -537,14 +528,14 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch) defer close(ch)
for _, tg := range sd.TargetGroups { for _, tg := range sd.TargetGroups {
select { select {
case <-done: case <-done:
return return
case ch <- tg: case ch <- *tg:
} }
} }
<-done <-done

View file

@ -52,7 +52,7 @@ func TestPrefixedTargetProvider(t *testing.T) {
t.Fatalf("expected sources %v, got %v", expSources, tp.Sources()) t.Fatalf("expected sources %v, got %v", expSources, tp.Sources())
} }
ch := make(chan *config.TargetGroup) ch := make(chan config.TargetGroup)
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
@ -64,10 +64,10 @@ func TestPrefixedTargetProvider(t *testing.T) {
expGroup2.Source = "job-x:static:123:1" expGroup2.Source = "job-x:static:123:1"
// The static target provider sends on the channel once per target group. // The static target provider sends on the channel once per target group.
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) { if tg := <-ch; !reflect.DeepEqual(tg, expGroup1) {
t.Fatalf("expected target group %v, got %v", expGroup1, tg) t.Fatalf("expected target group %v, got %v", expGroup1, tg)
} }
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) { if tg := <-ch; !reflect.DeepEqual(tg, expGroup2) {
t.Fatalf("expected target group %v, got %v", expGroup2, tg) t.Fatalf("expected target group %v, got %v", expGroup2, tg)
} }
} }