diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 3e2f856f08..172c760a32 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -66,7 +66,7 @@ type ConsulDiscovery struct { // consulService contains data belonging to the same service. type consulService struct { name string - tgroup *config.TargetGroup + tgroup config.TargetGroup lastIndex uint64 removed bool running bool @@ -143,7 +143,7 @@ func (cd *ConsulDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) defer cd.stop() @@ -159,7 +159,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct close(srv.done) // Send clearing update. - ch <- &config.TargetGroup{Source: srv.name} + ch <- config.TargetGroup{Source: srv.name} break } // Launch watcher for the service. @@ -219,9 +219,8 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch srv, ok := cd.services[name] if !ok { srv = &consulService{ - name: name, - tgroup: &config.TargetGroup{}, - done: make(chan struct{}), + name: name, + done: make(chan struct{}), } srv.tgroup.Source = name cd.services[name] = srv @@ -246,7 +245,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch // watchService retrieves updates about srv from Consul's service endpoint. // On a potential update the resulting target group is sent to ch. -func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) { +func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) { catalog := cd.client.Catalog() for { nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index 7e4cb235be..40a8841e1f 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -91,7 +91,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery { } // Run implements the TargetProvider interface. -func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) ticker := time.NewTicker(dd.interval) @@ -119,7 +119,7 @@ func (dd *DNSDiscovery) Sources() []string { return srcs } -func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) { +func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) { var wg sync.WaitGroup wg.Add(len(dd.names)) for _, name := range dd.names { @@ -133,7 +133,7 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) { wg.Wait() } -func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) error { +func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error { response, err := lookupAll(name, dd.qtype) dnsSDLookupsCount.Inc() if err != nil { @@ -141,7 +141,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro return err } - tg := &config.TargetGroup{} + var tg config.TargetGroup for _, record := range response.Answer { target := model.LabelValue("") switch addr := record.(type) { diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 795355509e..193a68ab77 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -62,7 +62,7 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { } // Run implements the TargetProvider interface. -func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) ticker := time.NewTicker(ed.interval) @@ -73,7 +73,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- tg + ch <- *tg } for { @@ -83,7 +83,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- tg + ch <- *tg } case <-done: return diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index 44dc240aa0..eb0411b30f 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -103,7 +103,7 @@ func (fd *FileDiscovery) watchFiles() { } // Run implements the TargetProvider interface. -func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) defer fd.stop() @@ -188,7 +188,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { +func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { ref := map[string]int{} for _, p := range fd.listFiles() { tgroups, err := readFile(p) @@ -199,7 +199,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { continue } for _, tg := range tgroups { - ch <- tg + ch <- *tg } ref[p] = len(tgroups) } @@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- &config.TargetGroup{Source: fileSource(f, i)} + ch <- config.TargetGroup{Source: fileSource(f, i)} } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 0335013b1e..74270ad029 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -26,7 +26,7 @@ func testFileSD(t *testing.T, ext string) { var ( fsd = NewFileDiscovery(&conf) - ch = make(chan *config.TargetGroup) + ch = make(chan config.TargetGroup) done = make(chan struct{}) ) go fsd.Run(ch, done) diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index dce1b9ccd0..5438f6f21d 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -173,25 +173,35 @@ func (kd *Discovery) Sources() []string { } // Run implements the TargetProvider interface. -func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) - select { - case ch <- kd.updateMastersTargetGroup(): - case <-done: - return + if tg := kd.updateMastersTargetGroup(); tg != nil { + select { + case ch <- *tg: + case <-done: + return + } } - select { - case ch <- kd.updateNodesTargetGroup(): - case <-done: - return + if tg := kd.updateNodesTargetGroup(); tg != nil { + select { + case ch <- *tg: + case <-done: + return + } } for _, ns := range kd.services { for _, service := range ns { + tg := kd.addService(service) + + if tg == nil { + continue + } + select { - case ch <- kd.addService(service): + case ch <- *tg: case <-done: return } @@ -223,8 +233,12 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { } } + if tg == nil { + continue + } + select { - case ch <- tg: + case ch <- *tg: case <-done: return } diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go index ca89b91ea5..181ea440f4 100644 --- a/retrieval/discovery/marathon.go +++ b/retrieval/discovery/marathon.go @@ -53,7 +53,7 @@ func (md *MarathonDiscovery) Sources() []string { } // Run implements the TargetProvider interface. -func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for { @@ -69,7 +69,7 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan stru } } -func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error { +func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error { targetMap, err := md.fetchTargetGroups() if err != nil { return err @@ -77,7 +77,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error // Update services which are still present for _, tg := range targetMap { - ch <- tg + ch <- *tg } // Remove services which did disappear @@ -85,7 +85,7 @@ func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error _, ok := targetMap[source] if !ok { log.Debugf("Removing group for %s", source) - ch <- &config.TargetGroup{Source: source} + ch <- config.TargetGroup{Source: source} } } diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go index 6dd731e98c..f54d828077 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon_test.go @@ -26,8 +26,8 @@ import ( var marathonValidLabel = map[string]string{"prometheus": "yes"} -func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) { - ch := make(chan *config.TargetGroup) +func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) { + ch := make(chan config.TargetGroup) md := NewMarathonDiscovery(&config.MarathonSDConfig{ Servers: []string{"http://localhost:8080"}, }) diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 0e12656165..21bdb6b047 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -67,7 +67,7 @@ type ServersetDiscovery struct { conn *zk.Conn mu sync.RWMutex sources map[string]*config.TargetGroup - sdUpdates *chan<- *config.TargetGroup + sdUpdates *chan<- config.TargetGroup updates chan zookeeperTreeCacheEvent treeCaches []*zookeeperTreeCache } @@ -124,7 +124,7 @@ func (sd *ServersetDiscovery) processUpdates() { } sd.mu.Unlock() if sd.sdUpdates != nil { - *sd.sdUpdates <- tg + *sd.sdUpdates <- *tg } } @@ -134,11 +134,11 @@ func (sd *ServersetDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { // Send on everything we have seen so far. sd.mu.Lock() for _, targetGroup := range sd.sources { - ch <- targetGroup + ch <- *targetGroup } // Tell processUpdates to send future updates. sd.sdUpdates = &ch diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 1e715d774f..880e6230c6 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -52,12 +52,12 @@ type fakeTargetProvider struct { update chan *config.TargetGroup } -func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (tp *fakeTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for { select { case tg := <-tp.update: - ch <- tg + ch <- *tg case <-done: return } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 3f3a4377bd..f301b9a1b2 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -43,7 +43,7 @@ type TargetProvider interface { // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. // On receiving from done Run must return. - Run(up chan<- *config.TargetGroup, done <-chan struct{}) + Run(up chan<- config.TargetGroup, done <-chan struct{}) } // TargetManager maintains a set of targets, starts and stops their scraping and @@ -105,7 +105,7 @@ func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGr // targetGroupUpdate is a potentially changed/new target group // for the given scrape configuration. type targetGroupUpdate struct { - tg *config.TargetGroup + tg config.TargetGroup scfg *config.ScrapeConfig } @@ -126,9 +126,9 @@ func (tm *TargetManager) Run() { sources[src] = struct{}{} } - tgc := make(chan *config.TargetGroup) + tgc := make(chan config.TargetGroup) // Run the target provider after cleanup of the stale targets is done. - defer func(prov TargetProvider, tgc chan<- *config.TargetGroup, done <-chan struct{}) { + defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) { go prov.Run(tgc, done) }(prov, tgc, tm.done) @@ -140,9 +140,6 @@ func (tm *TargetManager) Run() { for { select { case tg := <-tgc: - if tg == nil { - break - } tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} case <-done: return @@ -179,12 +176,9 @@ func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan if !ok { return } - if update.tg == nil { - break - } log.Debugf("Received potential update for target group %q", update.tg.Source) - if err := tm.updateTargetGroup(update.tg, update.scfg); err != nil { + if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil { log.Errorf("Error updating targets: %s", err) } case <-done: @@ -382,10 +376,10 @@ func (tp *prefixedTargetProvider) Sources() []string { return srcs } -func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) - ch2 := make(chan *config.TargetGroup) + ch2 := make(chan config.TargetGroup) go tp.TargetProvider.Run(ch2, done) for { @@ -393,9 +387,6 @@ func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan case <-done: return case tg := <-ch2: - if tg == nil { - break - } tg.Source = tp.prefix(tg.Source) ch <- tg } @@ -537,14 +528,14 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { } // Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) { +func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { defer close(ch) for _, tg := range sd.TargetGroups { select { case <-done: return - case ch <- tg: + case ch <- *tg: } } <-done diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index aa0cb188a9..3f605f7693 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -52,7 +52,7 @@ func TestPrefixedTargetProvider(t *testing.T) { t.Fatalf("expected sources %v, got %v", expSources, tp.Sources()) } - ch := make(chan *config.TargetGroup) + ch := make(chan config.TargetGroup) done := make(chan struct{}) defer close(done) @@ -64,10 +64,10 @@ func TestPrefixedTargetProvider(t *testing.T) { expGroup2.Source = "job-x:static:123:1" // The static target provider sends on the channel once per target group. - if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) { + if tg := <-ch; !reflect.DeepEqual(tg, expGroup1) { t.Fatalf("expected target group %v, got %v", expGroup1, tg) } - if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) { + if tg := <-ch; !reflect.DeepEqual(tg, expGroup2) { t.Fatalf("expected target group %v, got %v", expGroup2, tg) } }