diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 84ed6d7ac..512008170 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -15,7 +15,6 @@ package discovery import ( "fmt" - "net/http" "strconv" "strings" "sync" @@ -24,6 +23,7 @@ import ( consul "github.com/hashicorp/consul/api" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -113,52 +113,24 @@ func NewConsulDiscovery(conf *config.ConsulSDConfig) (*ConsulDiscovery, error) { return cd, nil } -// Sources implements the TargetProvider interface. -func (cd *ConsulDiscovery) Sources() []string { - clientConf := *cd.clientConf - clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second} - - client, err := consul.NewClient(&clientConf) - if err != nil { - // NewClient always returns a nil error. - panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err)) - } - - srvs, _, err := client.Catalog().Services(nil) - if err != nil { - log.Errorf("Error refreshing service list: %s", err) - return nil - } - cd.mu.Lock() - defer cd.mu.Unlock() - - srcs := make([]string, 0, len(srvs)) - for name := range srvs { - if _, ok := cd.scrapedServices[name]; len(cd.scrapedServices) == 0 || ok { - srcs = append(srcs, name) - } - } - return srcs -} - // Run implements the TargetProvider interface. -func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (cd *ConsulDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) defer cd.stop() update := make(chan *consulService, 10) - go cd.watchServices(update, done) + go cd.watchServices(update, ctx.Done()) for { select { - case <-done: + case <-ctx.Done(): return case srv := <-update: if srv.removed { close(srv.done) // Send clearing update. - ch <- config.TargetGroup{Source: srv.name} + ch <- []*config.TargetGroup{{Source: srv.name}} break } // Launch watcher for the service. @@ -244,7 +216,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch // watchService retrieves updates about srv from Consul's service endpoint. // On a potential update the resulting target group is sent to ch. -func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) { +func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- []*config.TargetGroup) { catalog := cd.client.Catalog() for { nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ @@ -288,7 +260,11 @@ func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.Tar default: // Continue. } - ch <- srv.tgroup + // TODO(fabxc): do a copy for now to avoid races. The integration + // needs needs some general cleanup. + tg := srv.tgroup + ch <- []*config.TargetGroup{&tg} + cd.mu.Unlock() } } diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index 40a8841e1..7c744f8c0 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -91,7 +92,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery { } // Run implements the TargetProvider interface. -func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (dd *DNSDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) ticker := time.NewTicker(dd.interval) @@ -104,23 +105,15 @@ func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) select { case <-ticker.C: dd.refreshAll(ch) - case <-done: + case <-ctx.Done(): return } } } -// Sources implements the TargetProvider interface. -func (dd *DNSDiscovery) Sources() []string { - var srcs []string - for _, name := range dd.names { - srcs = append(srcs, name) - } - return srcs -} - -func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) { +func (dd *DNSDiscovery) refreshAll(ch chan<- []*config.TargetGroup) { var wg sync.WaitGroup + wg.Add(len(dd.names)) for _, name := range dd.names { go func(n string) { @@ -130,10 +123,11 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) { wg.Done() }(name) } + wg.Wait() } -func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error { +func (dd *DNSDiscovery) refresh(name string, ch chan<- []*config.TargetGroup) error { response, err := lookupAll(name, dd.qtype) dnsSDLookupsCount.Inc() if err != nil { @@ -141,7 +135,8 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error return err } - var tg config.TargetGroup + tg := &config.TargetGroup{} + for _, record := range response.Answer { target := model.LabelValue("") switch addr := record.(type) { @@ -166,7 +161,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error } tg.Source = name - ch <- tg + ch <- []*config.TargetGroup{tg} return nil } diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 46b3d371f..e39016948 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/aws/defaults" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/aws/aws-sdk-go/service/ec2" "github.com/prometheus/prometheus/config" @@ -46,7 +47,6 @@ const ( // the TargetProvider interface. type EC2Discovery struct { aws *aws.Config - done chan struct{} interval time.Duration port int } @@ -62,14 +62,13 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { Region: &conf.Region, Credentials: creds, }, - done: make(chan struct{}), interval: time.Duration(conf.RefreshInterval), port: conf.Port, } } // Run implements the TargetProvider interface. -func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) ticker := time.NewTicker(ed.interval) @@ -80,7 +79,7 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- *tg + ch <- []*config.TargetGroup{tg} } for { @@ -90,19 +89,14 @@ func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) if err != nil { log.Error(err) } else { - ch <- *tg + ch <- []*config.TargetGroup{tg} } - case <-done: + case <-ctx.Done(): return } } } -// Sources implements the TargetProvider interface. -func (ed *EC2Discovery) Sources() []string { - return []string{*ed.aws.Region} -} - func (ed *EC2Discovery) refresh() (*config.TargetGroup, error) { ec2s := ec2.New(ed.aws) tg := &config.TargetGroup{ diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index eb0411b30..9aedb1605 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "gopkg.in/fsnotify.v1" "gopkg.in/yaml.v2" @@ -53,23 +54,6 @@ func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery { } } -// Sources implements the TargetProvider interface. -func (fd *FileDiscovery) Sources() []string { - var srcs []string - // As we allow multiple target groups per file we have no choice - // but to parse them all. - for _, p := range fd.listFiles() { - tgroups, err := readFile(p) - if err != nil { - log.Errorf("Error reading file %q: %s", p, err) - } - for _, tg := range tgroups { - srcs = append(srcs, tg.Source) - } - } - return srcs -} - // listFiles returns a list of all files that match the configured patterns. func (fd *FileDiscovery) listFiles() []string { var paths []string @@ -103,7 +87,7 @@ func (fd *FileDiscovery) watchFiles() { } // Run implements the TargetProvider interface. -func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) defer fd.stop() @@ -123,11 +107,11 @@ func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) // Stopping has priority over refreshing. Thus we wrap the actual select // clause to always catch done signals. select { - case <-done: + case <-ctx.Done(): return default: select { - case <-done: + case <-ctx.Done(): return case event := <-fd.watcher.Events: @@ -188,7 +172,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { +func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { ref := map[string]int{} for _, p := range fd.listFiles() { tgroups, err := readFile(p) @@ -198,9 +182,8 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { ref[p] = fd.lastRefresh[p] continue } - for _, tg := range tgroups { - ch <- *tg - } + ch <- tgroups + ref[p] = len(tgroups) } // Send empty updates for sources that disappeared. @@ -208,7 +191,9 @@ func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- config.TargetGroup{Source: fileSource(f, i)} + ch <- []*config.TargetGroup{ + {Source: fileSource(f, i)}, + } } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 4c1407666..2e1babce1 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" ) @@ -27,17 +28,17 @@ func testFileSD(t *testing.T, ext string) { conf.RefreshInterval = model.Duration(1 * time.Hour) var ( - fsd = NewFileDiscovery(&conf) - ch = make(chan config.TargetGroup) - done = make(chan struct{}) + fsd = NewFileDiscovery(&conf) + ch = make(chan []*config.TargetGroup) + ctx, cancel = context.WithCancel(context.Background()) ) - go fsd.Run(ch, done) + go fsd.Run(ctx, ch) select { case <-time.After(25 * time.Millisecond): // Expected. - case tg := <-ch: - t.Fatalf("Unexpected target group in file discovery: %s", tg) + case tgs := <-ch: + t.Fatalf("Unexpected target groups in file discovery: %s", tgs) } newf, err := os.Create("fixtures/_test" + ext) @@ -58,37 +59,37 @@ func testFileSD(t *testing.T, ext string) { } newf.Close() - // The files contain two target groups which are read and sent in order. + // The files contain two target groups. select { case <-time.After(15 * time.Second): t.Fatalf("Expected new target group but got none") - case tg := <-ch: + case tgs := <-ch: + tg := tgs[0] + if _, ok := tg.Labels["foo"]; !ok { t.Fatalf("Label not parsed") } if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) { t.Fatalf("Unexpected target group %s", tg) } - } - select { - case <-time.After(15 * time.Second): - t.Fatalf("Expected new target group but got none") - case tg := <-ch: + + tg = tgs[1] if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) { - t.Fatalf("Unexpected target group %s", tg) + t.Fatalf("Unexpected target groups %s", tg) } } + // Based on unknown circumstances, sometimes fsnotify will trigger more events in // some runs (which might be empty, chains of different operations etc.). // We have to drain those (as the target manager would) to avoid deadlocking and must // not try to make sense of it all... drained := make(chan struct{}) go func() { - for tg := range ch { + for tgs := range ch { // Below we will change the file to a bad syntax. Previously extracted target // groups must not be deleted via sending an empty target group. - if len(tg.Targets) == 0 { - t.Errorf("Unexpected empty target group received: %s", tg) + if len(tgs[0].Targets) == 0 { + t.Errorf("Unexpected empty target groups received: %s", tgs) } } close(drained) @@ -107,6 +108,6 @@ func testFileSD(t *testing.T, ext string) { os.Rename(newf.Name(), "fixtures/_test"+ext) - close(done) + cancel() <-drained } diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index a23a3e08e..fd7974f8f 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/httputil" @@ -94,75 +95,35 @@ func (kd *Discovery) Initialize() error { return nil } -// Sources implements the TargetProvider interface. -func (kd *Discovery) Sources() []string { - sourceNames := make([]string, 0, len(kd.apiServers)) - for _, apiServer := range kd.apiServers { - sourceNames = append(sourceNames, apiServersTargetGroupName+":"+apiServer.Host) - } - - nodes, _, err := kd.getNodes() - if err != nil { - // If we can't list nodes then we can't watch them. Assume this is a misconfiguration - // & log & return empty. - log.Errorf("Unable to initialize Kubernetes nodes: %s", err) - return []string{} - } - sourceNames = append(sourceNames, kd.nodeSources(nodes)...) - - services, _, err := kd.getServices() - if err != nil { - // If we can't list services then we can't watch them. Assume this is a misconfiguration - // & log & return empty. - log.Errorf("Unable to initialize Kubernetes services: %s", err) - return []string{} - } - sourceNames = append(sourceNames, kd.serviceSources(services)...) - - return sourceNames -} - -func (kd *Discovery) nodeSources(nodes map[string]*Node) []string { - var sourceNames []string - for name := range nodes { - sourceNames = append(sourceNames, nodesTargetGroupName+":"+name) - } - return sourceNames -} - -func (kd *Discovery) serviceSources(services map[string]map[string]*Service) []string { - var sourceNames []string - for _, ns := range services { - for _, service := range ns { - sourceNames = append(sourceNames, serviceSource(service)) - } - } - return sourceNames -} - // Run implements the TargetProvider interface. -func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) - if tg := kd.updateAPIServersTargetGroup(); tg != nil { - select { - case ch <- *tg: - case <-done: - return - } + // Send an initial full view. + // TODO(fabxc): this does not include all available services and service + // endpoints yet. Service endpoints were also missing in the previous Sources() method. + var all []*config.TargetGroup + + all = append(all, kd.updateAPIServersTargetGroup()) + all = append(all, kd.updateNodesTargetGroup()) + + select { + case ch <- all: + case <-ctx.Done(): + return } retryInterval := time.Duration(kd.Conf.RetryInterval) update := make(chan interface{}, 10) - go kd.watchNodes(update, done, retryInterval) - go kd.startServiceWatch(update, done, retryInterval) + go kd.watchNodes(update, ctx.Done(), retryInterval) + go kd.startServiceWatch(update, ctx.Done(), retryInterval) var tg *config.TargetGroup for { select { - case <-done: + case <-ctx.Done(): return case event := <-update: switch obj := event.(type) { @@ -181,8 +142,8 @@ func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { } select { - case ch <- *tg: - case <-done: + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): return } } diff --git a/retrieval/discovery/marathon.go b/retrieval/discovery/marathon.go index 181ea440f..6361a7873 100644 --- a/retrieval/discovery/marathon.go +++ b/retrieval/discovery/marathon.go @@ -17,6 +17,8 @@ import ( "time" "github.com/prometheus/common/log" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery/marathon" ) @@ -40,25 +42,13 @@ func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery { } } -// Sources implements the TargetProvider interface. -func (md *MarathonDiscovery) Sources() []string { - var sources []string - tgroups, err := md.fetchTargetGroups() - if err == nil { - for source := range tgroups { - sources = append(sources, source) - } - } - return sources -} - // Run implements the TargetProvider interface. -func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (md *MarathonDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer close(ch) for { select { - case <-done: + case <-ctx.Done(): return case <-time.After(md.refreshInterval): err := md.updateServices(ch) @@ -69,23 +59,24 @@ func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struc } } -func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error { +func (md *MarathonDiscovery) updateServices(ch chan<- []*config.TargetGroup) error { targetMap, err := md.fetchTargetGroups() if err != nil { return err } - // Update services which are still present + all := make([]*config.TargetGroup, 0, len(targetMap)) for _, tg := range targetMap { - ch <- *tg + all = append(all, tg) } + ch <- all // Remove services which did disappear for source := range md.lastRefresh { _, ok := targetMap[source] if !ok { log.Debugf("Removing group for %s", source) - ch <- config.TargetGroup{Source: source} + ch <- []*config.TargetGroup{{Source: source}} } } diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go index f54d82807..cc58a1380 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery/marathon" @@ -26,8 +27,8 @@ import ( var marathonValidLabel = map[string]string{"prometheus": "yes"} -func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) { - ch := make(chan config.TargetGroup) +func newTestDiscovery(client marathon.AppListClient) (chan []*config.TargetGroup, *MarathonDiscovery) { + ch := make(chan []*config.TargetGroup) md := NewMarathonDiscovery(&config.MarathonSDConfig{ Servers: []string{"http://localhost:8080"}, }) @@ -60,7 +61,9 @@ func TestMarathonSDEmptyList(t *testing.T) { go func() { select { case tg := <-ch: - t.Fatalf("Got group: %v", tg) + if len(tg) > 0 { + t.Fatalf("Got group: %v", tg) + } default: } }() @@ -96,7 +99,9 @@ func TestMarathonSDSendGroup(t *testing.T) { }) go func() { select { - case tg := <-ch: + case tgs := <-ch: + tg := tgs[0] + if tg.Source != "test-service" { t.Fatalf("Wrong target group name: %s", tg.Source) } @@ -121,9 +126,10 @@ func TestMarathonSDRemoveApp(t *testing.T) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) + go func() { - up1 := <-ch - up2 := <-ch + up1 := (<-ch)[0] + up2 := (<-ch)[0] if up2.Source != up1.Source { t.Fatalf("Source is different: %s", up2) if len(up2.Targets) > 0 { @@ -145,33 +151,25 @@ func TestMarathonSDRemoveApp(t *testing.T) { } } -func TestMarathonSDSources(t *testing.T) { - _, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { - return marathonTestAppList(marathonValidLabel, 1), nil - }) - sources := md.Sources() - if len(sources) != 1 { - t.Fatalf("Wrong number of sources: %s", sources) - } -} - func TestMarathonSDRunAndStop(t *testing.T) { ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) { return marathonTestAppList(marathonValidLabel, 1), nil }) md.refreshInterval = time.Millisecond * 10 - done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) go func() { select { case <-ch: - close(done) + cancel() case <-time.After(md.refreshInterval * 3): - close(done) + cancel() t.Fatalf("Update took too long.") } }() - md.Run(ch, done) + + md.Run(ctx, ch) + select { case <-ch: default: diff --git a/retrieval/discovery/nerve.go b/retrieval/discovery/nerve.go index d23416062..f99c9d61f 100644 --- a/retrieval/discovery/nerve.go +++ b/retrieval/discovery/nerve.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/common/model" "github.com/samuel/go-zookeeper/zk" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/treecache" @@ -47,7 +48,7 @@ type NerveDiscovery struct { conn *zk.Conn mu sync.RWMutex sources map[string]*config.TargetGroup - sdUpdates *chan<- config.TargetGroup + sdUpdates *chan<- []*config.TargetGroup updates chan treecache.ZookeeperTreeCacheEvent treeCaches []*treecache.ZookeeperTreeCache } @@ -73,17 +74,6 @@ func NewNerveDiscovery(conf *config.NerveSDConfig) *NerveDiscovery { return sd } -// Sources implements the TargetProvider interface. -func (sd *NerveDiscovery) Sources() []string { - sd.mu.RLock() - defer sd.mu.RUnlock() - srcs := []string{} - for t := range sd.sources { - srcs = append(srcs, t) - } - return srcs -} - func (sd *NerveDiscovery) processUpdates() { defer sd.conn.Close() for event := range sd.updates { @@ -104,7 +94,7 @@ func (sd *NerveDiscovery) processUpdates() { } sd.mu.Unlock() if sd.sdUpdates != nil { - *sd.sdUpdates <- *tg + *sd.sdUpdates <- []*config.TargetGroup{tg} } } @@ -114,17 +104,22 @@ func (sd *NerveDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *NerveDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (sd *NerveDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Send on everything we have seen so far. sd.mu.Lock() - for _, targetGroup := range sd.sources { - ch <- *targetGroup + + all := make([]*config.TargetGroup, 0, len(sd.sources)) + + for _, tg := range sd.sources { + all = append(all, tg) } + ch <- all + // Tell processUpdates to send future updates. sd.sdUpdates = &ch sd.mu.Unlock() - <-done + <-ctx.Done() for _, tc := range sd.treeCaches { tc.Stop() } diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index d1172b243..5a13af63c 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/samuel/go-zookeeper/zk" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" @@ -57,7 +58,7 @@ type ServersetDiscovery struct { conn *zk.Conn mu sync.RWMutex sources map[string]*config.TargetGroup - sdUpdates *chan<- config.TargetGroup + sdUpdates *chan<- []*config.TargetGroup updates chan treecache.ZookeeperTreeCacheEvent treeCaches []*treecache.ZookeeperTreeCache } @@ -83,17 +84,6 @@ func NewServersetDiscovery(conf *config.ServersetSDConfig) *ServersetDiscovery { return sd } -// Sources implements the TargetProvider interface. -func (sd *ServersetDiscovery) Sources() []string { - sd.mu.RLock() - defer sd.mu.RUnlock() - srcs := []string{} - for t := range sd.sources { - srcs = append(srcs, t) - } - return srcs -} - func (sd *ServersetDiscovery) processUpdates() { defer sd.conn.Close() for event := range sd.updates { @@ -114,7 +104,7 @@ func (sd *ServersetDiscovery) processUpdates() { } sd.mu.Unlock() if sd.sdUpdates != nil { - *sd.sdUpdates <- *tg + *sd.sdUpdates <- []*config.TargetGroup{tg} } } @@ -124,17 +114,22 @@ func (sd *ServersetDiscovery) processUpdates() { } // Run implements the TargetProvider interface. -func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { +func (sd *ServersetDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Send on everything we have seen so far. sd.mu.Lock() - for _, targetGroup := range sd.sources { - ch <- *targetGroup + + all := make([]*config.TargetGroup, 0, len(sd.sources)) + + for _, tg := range sd.sources { + all = append(all, tg) } + ch <- all + // Tell processUpdates to send future updates. sd.sdUpdates = &ch sd.mu.Unlock() - <-done + <-ctx.Done() for _, tc := range sd.treeCaches { tc.Stop() } @@ -142,8 +137,8 @@ func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan stru func parseServersetMember(data []byte, path string) (*model.LabelSet, error) { member := serversetMember{} - err := json.Unmarshal(data, &member) - if err != nil { + + if err := json.Unmarshal(data, &member); err != nil { return nil, fmt.Errorf("error unmarshaling serverset member %q: %s", path, err) } diff --git a/retrieval/scrape.go b/retrieval/scrape.go new file mode 100644 index 000000000..3697a2216 --- /dev/null +++ b/retrieval/scrape.go @@ -0,0 +1,450 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "errors" + "fmt" + "io" + "net/http" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" +) + +const ( + scrapeHealthMetricName = "up" + scrapeDurationMetricName = "scrape_duration_seconds" + + // Capacity of the channel to buffer samples during ingestion. + ingestedSamplesCap = 256 + + // Constants for instrumentation. + namespace = "prometheus" + interval = "interval" +) + +var ( + errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") + + targetIntervalLength = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "target_interval_length_seconds", + Help: "Actual intervals between scrapes.", + Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, + }, + []string{interval}, + ) + targetSkippedScrapes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "target_skipped_scrapes_total", + Help: "Total number of scrapes that were skipped because the metric storage was throttled.", + }, + []string{interval}, + ) +) + +func init() { + prometheus.MustRegister(targetIntervalLength) + prometheus.MustRegister(targetSkippedScrapes) +} + +// scrapePool manages scrapes for sets of targets. +type scrapePool struct { + appender storage.SampleAppender + + ctx context.Context + + mtx sync.RWMutex + config *config.ScrapeConfig + client *http.Client + // Targets and loops must always be synchronized to have the same + // set of hashes. + targets map[uint64]*Target + loops map[uint64]loop + + // Constructor for new scrape loops. This is settable for testing convenience. + newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop +} + +func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { + client, err := newHTTPClient(cfg) + if err != nil { + // Any errors that could occur here should be caught during config validation. + log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) + } + return &scrapePool{ + appender: app, + config: cfg, + client: client, + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + newLoop: newScrapeLoop, + } +} + +// stop terminates all scrape loops and returns after they all terminated. +func (sp *scrapePool) stop() { + var wg sync.WaitGroup + + sp.mtx.Lock() + defer sp.mtx.Unlock() + + for fp, l := range sp.loops { + wg.Add(1) + + go func(l loop) { + l.stop() + wg.Done() + }(l) + + delete(sp.loops, fp) + delete(sp.targets, fp) + } + + wg.Wait() +} + +// reload the scrape pool with the given scrape configuration. The target state is preserved +// but all scrape loops are restarted with the new scrape configuration. +// This method returns after all scrape loops that were stopped have fully terminated. +func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + + client, err := newHTTPClient(cfg) + if err != nil { + // Any errors that could occur here should be caught during config validation. + log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) + } + sp.config = cfg + sp.client = client + + var ( + wg sync.WaitGroup + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) + ) + + for fp, oldLoop := range sp.loops { + var ( + t = sp.targets[fp] + s = &targetScraper{Target: t, client: sp.client} + newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + ) + wg.Add(1) + + go func(oldLoop, newLoop loop) { + oldLoop.stop() + wg.Done() + + go newLoop.run(interval, timeout, nil) + }(oldLoop, newLoop) + + sp.loops[fp] = newLoop + } + + wg.Wait() +} + +// sync takes a list of potentially duplicated targets, deduplicates them, starts +// scrape loops for new targets, and stops scrape loops for disappeared targets. +// It returns after all stopped scrape loops terminated. +func (sp *scrapePool) sync(targets []*Target) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + + var ( + uniqueTargets = map[uint64]struct{}{} + interval = time.Duration(sp.config.ScrapeInterval) + timeout = time.Duration(sp.config.ScrapeTimeout) + ) + + for _, t := range targets { + hash := t.hash() + uniqueTargets[hash] = struct{}{} + + if _, ok := sp.targets[hash]; !ok { + s := &targetScraper{Target: t, client: sp.client} + l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + + sp.targets[hash] = t + sp.loops[hash] = l + + go l.run(interval, timeout, nil) + } + } + + var wg sync.WaitGroup + + // Stop and remove old targets and scraper loops. + for hash := range sp.targets { + if _, ok := uniqueTargets[hash]; !ok { + wg.Add(1) + go func(l loop) { + l.stop() + wg.Done() + }(sp.loops[hash]) + + delete(sp.loops, hash) + delete(sp.targets, hash) + } + } + + // Wait for all potentially stopped scrapers to terminate. + // This covers the case of flapping targets. If the server is under high load, a new scraper + // may be active and tries to insert. The old scraper that didn't terminate yet could still + // be inserting a previous sample set. + wg.Wait() +} + +// sampleAppender returns an appender for ingested samples from the target. +func (sp *scrapePool) sampleAppender(target *Target) storage.SampleAppender { + app := sp.appender + // The relabelAppender has to be inside the label-modifying appenders + // so the relabeling rules are applied to the correct label set. + if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 { + app = relabelAppender{ + SampleAppender: app, + relabelings: mrc, + } + } + + if sp.config.HonorLabels { + app = honorLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } else { + app = ruleLabelsAppender{ + SampleAppender: app, + labels: target.Labels(), + } + } + return app +} + +// reportAppender returns an appender for reporting samples for the target. +func (sp *scrapePool) reportAppender(target *Target) storage.SampleAppender { + return ruleLabelsAppender{ + SampleAppender: sp.appender, + labels: target.Labels(), + } +} + +// A scraper retrieves samples and accepts a status report at the end. +type scraper interface { + scrape(ctx context.Context, ts time.Time) (model.Samples, error) + report(start time.Time, dur time.Duration, err error) + offset(interval time.Duration) time.Duration +} + +// targetScraper implements the scraper interface for a target. +type targetScraper struct { + *Target + client *http.Client +} + +const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` + +func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) { + req, err := http.NewRequest("GET", s.URL().String(), nil) + if err != nil { + return nil, err + } + req.Header.Add("Accept", acceptHeader) + + resp, err := ctxhttp.Do(ctx, s.client, req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) + } + + var ( + allSamples = make(model.Samples, 0, 200) + decSamples = make(model.Vector, 0, 50) + ) + sdec := expfmt.SampleDecoder{ + Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)), + Opts: &expfmt.DecodeOptions{ + Timestamp: model.TimeFromUnixNano(ts.UnixNano()), + }, + } + + for { + if err = sdec.Decode(&decSamples); err != nil { + break + } + allSamples = append(allSamples, decSamples...) + decSamples = decSamples[:0] + } + + if err == io.EOF { + // Set err to nil since it is used in the scrape health recording. + err = nil + } + return allSamples, err +} + +// A loop can run and be stopped again. It must not be reused after it was stopped. +type loop interface { + run(interval, timeout time.Duration, errc chan<- error) + stop() +} + +type scrapeLoop struct { + scraper scraper + + appender storage.SampleAppender + reportAppender storage.SampleAppender + + done chan struct{} + ctx context.Context + cancel func() +} + +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.SampleAppender) loop { + sl := &scrapeLoop{ + scraper: sc, + appender: app, + reportAppender: reportApp, + done: make(chan struct{}), + } + sl.ctx, sl.cancel = context.WithCancel(ctx) + + return sl +} + +func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { + defer close(sl.done) + + select { + case <-time.After(sl.scraper.offset(interval)): + // Continue after a scraping offset. + case <-sl.ctx.Done(): + return + } + + var last time.Time + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-sl.ctx.Done(): + return + default: + } + + if !sl.appender.NeedsThrottling() { + var ( + start = time.Now() + scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout) + ) + + // Only record after the first scrape. + if !last.IsZero() { + targetIntervalLength.WithLabelValues(interval.String()).Observe( + float64(time.Since(last)) / float64(time.Second), // Sub-second precision. + ) + } + + samples, err := sl.scraper.scrape(scrapeCtx, start) + if err == nil { + sl.append(samples) + } else if errc != nil { + errc <- err + } + + sl.report(start, time.Since(start), err) + last = start + } else { + targetSkippedScrapes.WithLabelValues(interval.String()).Inc() + } + + select { + case <-sl.ctx.Done(): + return + case <-ticker.C: + } + } +} + +func (sl *scrapeLoop) stop() { + sl.cancel() + <-sl.done +} + +func (sl *scrapeLoop) append(samples model.Samples) { + numOutOfOrder := 0 + + for _, s := range samples { + if err := sl.appender.Append(s); err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + log.Warnf("Error inserting sample: %s", err) + } + } + } + if numOutOfOrder > 0 { + log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") + } +} + +func (sl *scrapeLoop) report(start time.Time, duration time.Duration, err error) { + sl.scraper.report(start, duration, err) + + ts := model.TimeFromUnixNano(start.UnixNano()) + + var health model.SampleValue + if err == nil { + health = 1 + } + + healthSample := &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: scrapeHealthMetricName, + }, + Timestamp: ts, + Value: health, + } + durationSample := &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: scrapeDurationMetricName, + }, + Timestamp: ts, + Value: model.SampleValue(float64(duration) / float64(time.Second)), + } + + sl.reportAppender.Append(healthSample) + sl.reportAppender.Append(durationSample) +} diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go new file mode 100644 index 000000000..7ea1eba5a --- /dev/null +++ b/retrieval/scrape_test.go @@ -0,0 +1,587 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/common/model" + "golang.org/x/net/context" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" +) + +func TestNewScrapePool(t *testing.T) { + var ( + app = &nopAppender{} + cfg = &config.ScrapeConfig{} + sp = newScrapePool(cfg, app) + ) + + if a, ok := sp.appender.(*nopAppender); !ok || a != app { + t.Fatalf("Wrong sample appender") + } + if sp.config != cfg { + t.Fatalf("Wrong scrape config") + } + if sp.newLoop == nil { + t.Fatalf("newLoop function not initialized") + } +} + +type testLoop struct { + startFunc func(interval, timeout time.Duration, errc chan<- error) + stopFunc func() +} + +func (l *testLoop) run(interval, timeout time.Duration, errc chan<- error) { + l.startFunc(interval, timeout, errc) +} + +func (l *testLoop) stop() { + l.stopFunc() +} + +func TestScrapePoolStop(t *testing.T) { + sp := &scrapePool{ + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + } + var mtx sync.Mutex + stopped := map[uint64]bool{} + numTargets := 20 + + // Stopping the scrape pool must call stop() on all scrape loops, + // clean them and the respective targets up. It must wait until each loop's + // stop function returned before returning itself. + + for i := 0; i < numTargets; i++ { + t := &Target{ + labels: model.LabelSet{ + model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), + }, + } + l := &testLoop{} + l.stopFunc = func() { + time.Sleep(time.Duration(i*20) * time.Millisecond) + + mtx.Lock() + stopped[t.hash()] = true + mtx.Unlock() + } + + sp.targets[t.hash()] = t + sp.loops[t.hash()] = l + } + + done := make(chan struct{}) + stopTime := time.Now() + + go func() { + sp.stop() + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("scrapeLoop.stop() did not return as expected") + case <-done: + // This should have taken at least as long as the last target slept. + if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond { + t.Fatalf("scrapeLoop.stop() exited before all targets stopped") + } + } + + mtx.Lock() + if len(stopped) != numTargets { + t.Fatalf("Expected 20 stopped loops, got %d", len(stopped)) + } + mtx.Unlock() + + if len(sp.targets) > 0 { + t.Fatalf("Targets were not cleared on stopping: %d left", len(sp.targets)) + } + if len(sp.loops) > 0 { + t.Fatalf("Loops were not cleared on stopping: %d left", len(sp.loops)) + } +} + +func TestScrapePoolReload(t *testing.T) { + var mtx sync.Mutex + numTargets := 20 + + stopped := map[uint64]bool{} + + reloadCfg := &config.ScrapeConfig{ + ScrapeInterval: model.Duration(3 * time.Second), + ScrapeTimeout: model.Duration(2 * time.Second), + } + // On starting to run, new loops created on reload check whether their preceeding + // equivalents have been stopped. + newLoop := func(ctx context.Context, s scraper, app, reportApp storage.SampleAppender) loop { + l := &testLoop{} + l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { + if interval != 3*time.Second { + t.Errorf("Expected scrape interval %d but got %d", 3*time.Second, interval) + } + if timeout != 2*time.Second { + t.Errorf("Expected scrape timeout %d but got %d", 2*time.Second, timeout) + } + mtx.Lock() + if !stopped[s.(*targetScraper).hash()] { + t.Errorf("Scrape loop for %v not stopped yet", s.(*targetScraper)) + } + mtx.Unlock() + } + return l + } + sp := &scrapePool{ + targets: map[uint64]*Target{}, + loops: map[uint64]loop{}, + newLoop: newLoop, + } + + // Reloading a scrape pool with a new scrape configuration must stop all scrape + // loops and start new ones. A new loop must not be started before the preceeding + // one terminated. + + for i := 0; i < numTargets; i++ { + t := &Target{ + labels: model.LabelSet{ + model.AddressLabel: model.LabelValue(fmt.Sprintf("example.com:%d", i)), + }, + } + l := &testLoop{} + l.stopFunc = func() { + time.Sleep(time.Duration(i*20) * time.Millisecond) + + mtx.Lock() + stopped[t.hash()] = true + mtx.Unlock() + } + + sp.targets[t.hash()] = t + sp.loops[t.hash()] = l + } + done := make(chan struct{}) + + beforeTargets := map[uint64]*Target{} + for h, t := range sp.targets { + beforeTargets[h] = t + } + + reloadTime := time.Now() + + go func() { + sp.reload(reloadCfg) + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("scrapeLoop.reload() did not return as expected") + case <-done: + // This should have taken at least as long as the last target slept. + if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond { + t.Fatalf("scrapeLoop.stop() exited before all targets stopped") + } + } + + mtx.Lock() + if len(stopped) != numTargets { + t.Fatalf("Expected 20 stopped loops, got %d", stopped) + } + mtx.Unlock() + + if !reflect.DeepEqual(sp.targets, beforeTargets) { + t.Fatalf("Reloading affected target states unexpectedly") + } + if len(sp.loops) != numTargets { + t.Fatalf("Expected %d loops after reload but got %d", numTargets, len(sp.loops)) + } +} + +func TestScrapePoolReportAppender(t *testing.T) { + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + {}, {}, {}, + }, + } + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + app := &nopAppender{} + + sp := newScrapePool(cfg, app) + + cfg.HonorLabels = false + wrapped := sp.reportAppender(target) + + rl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + if rl.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", rl.SampleAppender) + } + + cfg.HonorLabels = true + wrapped = sp.reportAppender(target) + + hl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + if hl.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", hl.SampleAppender) + } +} + +func TestScrapePoolSampleAppender(t *testing.T) { + cfg := &config.ScrapeConfig{ + MetricRelabelConfigs: []*config.RelabelConfig{ + {}, {}, {}, + }, + } + + target := newTestTarget("example.com:80", 10*time.Millisecond, nil) + app := &nopAppender{} + + sp := newScrapePool(cfg, app) + + cfg.HonorLabels = false + wrapped := sp.sampleAppender(target) + + rl, ok := wrapped.(ruleLabelsAppender) + if !ok { + t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) + } + re, ok := rl.SampleAppender.(relabelAppender) + if !ok { + t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) + } + if re.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", re.SampleAppender) + } + + cfg.HonorLabels = true + wrapped = sp.sampleAppender(target) + + hl, ok := wrapped.(honorLabelsAppender) + if !ok { + t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) + } + re, ok = hl.SampleAppender.(relabelAppender) + if !ok { + t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) + } + if re.SampleAppender != app { + t.Fatalf("Expected base appender but got %T", re.SampleAppender) + } +} + +func TestScrapeLoopStop(t *testing.T) { + scraper := &testScraper{} + sl := newScrapeLoop(context.Background(), scraper, nil, nil) + + // The scrape pool synchronizes on stopping scrape loops. However, new scrape + // loops are syarted asynchronously. Thus it's possible, that a loop is stopped + // again before having started properly. + // Stopping not-yet-started loops must block until the run method was called and exited. + // The run method must exit immediately. + + stopDone := make(chan struct{}) + go func() { + sl.stop() + close(stopDone) + }() + + select { + case <-stopDone: + t.Fatalf("Stopping terminated before run exited successfully") + case <-time.After(500 * time.Millisecond): + } + + // Running the scrape loop must exit before calling the scraper even once. + scraper.scrapeFunc = func(context.Context, time.Time) (model.Samples, error) { + t.Fatalf("scraper was called for terminated scrape loop") + return nil, nil + } + + runDone := make(chan struct{}) + go func() { + sl.run(0, 0, nil) + close(runDone) + }() + + select { + case <-runDone: + case <-time.After(1 * time.Second): + t.Fatalf("Running terminated scrape loop did not exit") + } + + select { + case <-stopDone: + case <-time.After(1 * time.Second): + t.Fatalf("Stopping did not terminate after running exited") + } +} + +func TestScrapeLoopRun(t *testing.T) { + var ( + signal = make(chan struct{}) + errc = make(chan error) + + scraper = &testScraper{} + app = &nopAppender{} + reportApp = &nopAppender{} + ) + defer close(signal) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, scraper, app, reportApp) + + // The loop must terminate during the initial offset if the context + // is canceled. + scraper.offsetDur = time.Hour + + go func() { + sl.run(time.Second, time.Hour, errc) + signal <- struct{}{} + }() + + // Wait to make sure we are actually waiting on the offset. + time.Sleep(1 * time.Second) + + cancel() + select { + case <-signal: + case <-time.After(5 * time.Second): + t.Fatalf("Cancelation during initial offset failed") + case err := <-errc: + t.Fatalf("Unexpected error: %s", err) + } + + // The provided timeout must cause cancelation of the context passed down to the + // scraper. The scraper has to respect the context. + scraper.offsetDur = 0 + + block := make(chan struct{}) + scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (model.Samples, error) { + select { + case <-block: + case <-ctx.Done(): + return nil, ctx.Err() + } + return nil, nil + } + + ctx, cancel = context.WithCancel(context.Background()) + sl = newScrapeLoop(ctx, scraper, app, reportApp) + + go func() { + sl.run(time.Second, 100*time.Millisecond, errc) + signal <- struct{}{} + }() + + select { + case err := <-errc: + if err != context.DeadlineExceeded { + t.Fatalf("Expected timeout error but got: %s", err) + } + case <-time.After(3 * time.Second): + t.Fatalf("Expected timeout error but got none") + } + + // We already caught the timeout error and are certainly in the loop. + // Let the scrapes returns immediately to cause no further timeout errors + // and check whether canceling the parent context terminates the loop. + close(block) + cancel() + + select { + case <-signal: + // Loop terminated as expected. + case err := <-errc: + t.Fatalf("Unexpected error: %s", err) + case <-time.After(3 * time.Second): + t.Fatalf("Loop did not terminate on context cancelation") + } +} + +func TestTargetScraperScrapeOK(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", `text/plain; version=0.0.4`) + w.Write([]byte("metric_a 1\nmetric_b 2\n")) + }), + ) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + if err != nil { + panic(err) + } + + ts := &targetScraper{ + Target: &Target{ + labels: model.LabelSet{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }, + }, + client: http.DefaultClient, + } + now := time.Now() + + samples, err := ts.scrape(context.Background(), now) + if err != nil { + t.Fatalf("Unexpected scrape error: %s", err) + } + + expectedSamples := model.Samples{ + { + Metric: model.Metric{"__name__": "metric_a"}, + Timestamp: model.TimeFromUnixNano(now.UnixNano()), + Value: 1, + }, + { + Metric: model.Metric{"__name__": "metric_b"}, + Timestamp: model.TimeFromUnixNano(now.UnixNano()), + Value: 2, + }, + } + + if !reflect.DeepEqual(samples, expectedSamples) { + t.Errorf("Scraped samples did not match served metrics") + t.Errorf("Expected: %v", expectedSamples) + t.Fatalf("Got: %v", samples) + } +} + +func TestTargetScrapeScrapeCancel(t *testing.T) { + block := make(chan struct{}) + + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-block + }), + ) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + if err != nil { + panic(err) + } + + ts := &targetScraper{ + Target: &Target{ + labels: model.LabelSet{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }, + }, + client: http.DefaultClient, + } + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + + go func() { + time.Sleep(1 * time.Second) + cancel() + }() + + go func() { + if _, err := ts.scrape(ctx, time.Now()); err != context.Canceled { + t.Fatalf("Expected context cancelation error but got: %s", err) + } + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("Scrape function did not return unexpectedly") + case <-done: + } + // If this is closed in a defer above the function the test server + // does not terminate and the test doens't complete. + close(block) +} + +func TestTargetScrapeScrapeNotFound(t *testing.T) { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + }), + ) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + if err != nil { + panic(err) + } + + ts := &targetScraper{ + Target: &Target{ + labels: model.LabelSet{ + model.SchemeLabel: model.LabelValue(serverURL.Scheme), + model.AddressLabel: model.LabelValue(serverURL.Host), + }, + }, + client: http.DefaultClient, + } + + if _, err := ts.scrape(context.Background(), time.Now()); !strings.Contains(err.Error(), "404") { + t.Fatalf("Expected \"404 NotFound\" error but got: %s", err) + } +} + +// testScraper implements the scraper interface and allows setting values +// returned by its methods. It also allows setting a custom scrape function. +type testScraper struct { + offsetDur time.Duration + + lastStart time.Time + lastDuration time.Duration + lastError error + + samples model.Samples + scrapeErr error + scrapeFunc func(context.Context, time.Time) (model.Samples, error) +} + +func (ts *testScraper) offset(interval time.Duration) time.Duration { + return ts.offsetDur +} + +func (ts *testScraper) report(start time.Time, duration time.Duration, err error) { + ts.lastStart = start + ts.lastDuration = duration + ts.lastError = err +} + +func (ts *testScraper) scrape(ctx context.Context, t time.Time) (model.Samples, error) { + if ts.scrapeFunc != nil { + return ts.scrapeFunc(ctx, t) + } + return ts.samples, ts.scrapeErr +} diff --git a/retrieval/target.go b/retrieval/target.go index bf00f7ca7..65d5ba123 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -14,9 +14,8 @@ package retrieval import ( - "errors" "fmt" - "io" + "hash/fnv" "io/ioutil" "net/http" "net/url" @@ -24,200 +23,46 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/log" "github.com/prometheus/common/model" - "golang.org/x/net/context" - "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) -const ( - scrapeHealthMetricName = "up" - scrapeDurationMetricName = "scrape_duration_seconds" - - // Capacity of the channel to buffer samples during ingestion. - ingestedSamplesCap = 256 - - // Constants for instrumentation. - namespace = "prometheus" - interval = "interval" -) - -var ( - errSkippedScrape = errors.New("scrape skipped due to throttled ingestion") - - targetIntervalLength = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: namespace, - Name: "target_interval_length_seconds", - Help: "Actual intervals between scrapes.", - Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, - }, - []string{interval}, - ) - targetSkippedScrapes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "target_skipped_scrapes_total", - Help: "Total number of scrapes that were skipped because the metric storage was throttled.", - }, - []string{interval}, - ) -) - -func init() { - prometheus.MustRegister(targetIntervalLength) - prometheus.MustRegister(targetSkippedScrapes) -} - // TargetHealth describes the health state of a target. -type TargetHealth int - -func (t TargetHealth) String() string { - switch t { - case HealthUnknown: - return "unknown" - case HealthGood: - return "up" - case HealthBad: - return "down" - } - panic("unknown state") -} - -func (t TargetHealth) value() model.SampleValue { - if t == HealthGood { - return 1 - } - return 0 -} +type TargetHealth string +// The possible health states of a target based on the last performed scrape. const ( - // HealthUnknown is the state of a Target before it is first scraped. - HealthUnknown TargetHealth = iota - // HealthGood is the state of a Target that has been successfully scraped. - HealthGood - // HealthBad is the state of a Target that was scraped unsuccessfully. - HealthBad + HealthUnknown TargetHealth = "unknown" + HealthGood TargetHealth = "up" + HealthBad TargetHealth = "down" ) -// TargetStatus contains information about the current status of a scrape target. -type TargetStatus struct { - lastError error - lastScrape time.Time - health TargetHealth - - mu sync.RWMutex -} - -// LastError returns the error encountered during the last scrape. -func (ts *TargetStatus) LastError() error { - ts.mu.RLock() - defer ts.mu.RUnlock() - - return ts.lastError -} - -// LastScrape returns the time of the last scrape. -func (ts *TargetStatus) LastScrape() time.Time { - ts.mu.RLock() - defer ts.mu.RUnlock() - - return ts.lastScrape -} - -// Health returns the last known health state of the target. -func (ts *TargetStatus) Health() TargetHealth { - ts.mu.RLock() - defer ts.mu.RUnlock() - - return ts.health -} - -func (ts *TargetStatus) setLastScrape(t time.Time) { - ts.mu.Lock() - defer ts.mu.Unlock() - - ts.lastScrape = t -} - -func (ts *TargetStatus) setLastError(err error) { - ts.mu.Lock() - defer ts.mu.Unlock() - - if err == nil { - ts.health = HealthGood - } else { - ts.health = HealthBad - } - ts.lastError = err -} - // Target refers to a singular HTTP or HTTPS endpoint. type Target struct { - // The status object for the target. It is only set once on initialization. - status *TargetStatus - // Closing scraperStopping signals that scraping should stop. - scraperStopping chan struct{} - // Closing scraperStopped signals that scraping has been stopped. - scraperStopped chan struct{} - - // Mutex protects the members below. - sync.RWMutex - - scrapeConfig *config.ScrapeConfig - // Labels before any processing. metaLabels model.LabelSet // Any labels that are added to this target and its metrics. labels model.LabelSet + // Additional URL parmeters that are part of the target URL. + params url.Values - // The HTTP client used to scrape the target's endpoint. - httpClient *http.Client + mtx sync.RWMutex + lastError error + lastScrape time.Time + health TargetHealth } // NewTarget creates a reasonably configured target for querying. -func NewTarget(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) (*Target, error) { - t := &Target{ - status: &TargetStatus{}, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), +func NewTarget(labels, metaLabels model.LabelSet, params url.Values) *Target { + return &Target{ + labels: labels, + metaLabels: metaLabels, + params: params, + health: HealthUnknown, } - err := t.Update(cfg, labels, metaLabels) - return t, err -} - -// Status returns the status of the target. -func (t *Target) Status() *TargetStatus { - return t.status -} - -// Update overwrites settings in the target that are derived from the job config -// it belongs to. -func (t *Target) Update(cfg *config.ScrapeConfig, labels, metaLabels model.LabelSet) error { - t.Lock() - - t.scrapeConfig = cfg - t.labels = labels - t.metaLabels = metaLabels - - t.Unlock() - - httpClient, err := t.client() - if err != nil { - return fmt.Errorf("cannot create HTTP client: %s", err) - } - t.Lock() - t.httpClient = httpClient - t.Unlock() - - return nil } func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) { @@ -265,15 +110,16 @@ func newHTTPClient(cfg *config.ScrapeConfig) (*http.Client, error) { } func (t *Target) String() string { - return t.host() + return t.URL().String() } -// fingerprint returns an identifying hash for the target. -func (t *Target) fingerprint() model.Fingerprint { - t.RLock() - defer t.RUnlock() +// hash returns an identifying hash for the target. +func (t *Target) hash() uint64 { + h := fnv.New64a() + h.Write([]byte(t.labels.Fingerprint().String())) + h.Write([]byte(t.URL().String())) - return t.labels.Fingerprint() + return h.Sum64() } // offset returns the time until the next scrape cycle for the target. @@ -282,7 +128,7 @@ func (t *Target) offset(interval time.Duration) time.Duration { var ( base = now % int64(interval) - offset = uint64(t.fingerprint()) % uint64(interval) + offset = t.hash() % uint64(interval) next = base + int64(offset) ) @@ -292,92 +138,27 @@ func (t *Target) offset(interval time.Duration) time.Duration { return time.Duration(next) } -func (t *Target) client() (*http.Client, error) { - t.RLock() - defer t.RUnlock() - - return newHTTPClient(t.scrapeConfig) -} - -func (t *Target) interval() time.Duration { - t.RLock() - defer t.RUnlock() - - return time.Duration(t.scrapeConfig.ScrapeInterval) -} - -func (t *Target) timeout() time.Duration { - t.RLock() - defer t.RUnlock() - - return time.Duration(t.scrapeConfig.ScrapeTimeout) -} - -func (t *Target) scheme() string { - t.RLock() - defer t.RUnlock() - - return string(t.labels[model.SchemeLabel]) -} - -func (t *Target) host() string { - t.RLock() - defer t.RUnlock() - - return string(t.labels[model.AddressLabel]) -} - -func (t *Target) path() string { - t.RLock() - defer t.RUnlock() - - return string(t.labels[model.MetricsPathLabel]) -} - -// wrapAppender wraps a SampleAppender for samples ingested from the target. -// RLock must be acquired by the caller. -func (t *Target) wrapAppender(app storage.SampleAppender) storage.SampleAppender { - // The relabelAppender has to be inside the label-modifying appenders - // so the relabeling rules are applied to the correct label set. - if mrc := t.scrapeConfig.MetricRelabelConfigs; len(mrc) > 0 { - app = relabelAppender{ - SampleAppender: app, - relabelings: mrc, +// Labels returns a copy of the set of all public labels of the target. +func (t *Target) Labels() model.LabelSet { + lset := make(model.LabelSet, len(t.labels)) + for ln, lv := range t.labels { + if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { + lset[ln] = lv } } - - if t.scrapeConfig.HonorLabels { - app = honorLabelsAppender{ - SampleAppender: app, - labels: t.unlockedLabels(), - } - } else { - app = ruleLabelsAppender{ - SampleAppender: app, - labels: t.unlockedLabels(), - } - } - return app + return lset } -// wrapReportingAppender wraps an appender for target status report samples. -// It ignores any relabeling rules set for the target. -// RLock must not be acquired by the caller. -func (t *Target) wrapReportingAppender(app storage.SampleAppender) storage.SampleAppender { - return ruleLabelsAppender{ - SampleAppender: app, - labels: t.Labels(), - } +// MetaLabels returns a copy of the target's labels before any processing. +func (t *Target) MetaLabels() model.LabelSet { + return t.metaLabels.Clone() } // URL returns a copy of the target's URL. func (t *Target) URL() *url.URL { - t.RLock() - defer t.RUnlock() - params := url.Values{} - for k, v := range t.scrapeConfig.Params { + for k, v := range t.params { params[k] = make([]string, len(v)) copy(params[k], v) } @@ -402,191 +183,51 @@ func (t *Target) URL() *url.URL { } } -// InstanceIdentifier returns the identifier for the target. -func (t *Target) InstanceIdentifier() string { - return t.host() -} +func (t *Target) report(start time.Time, dur time.Duration, err error) { + t.mtx.Lock() + defer t.mtx.Unlock() -// RunScraper implements Target. -func (t *Target) RunScraper(sampleAppender storage.SampleAppender) { - defer close(t.scraperStopped) - - lastScrapeInterval := t.interval() - - log.Debugf("Starting scraper for target %v...", t) - - select { - case <-time.After(t.offset(lastScrapeInterval)): - // Continue after scraping offset. - case <-t.scraperStopping: - return - } - - ticker := time.NewTicker(lastScrapeInterval) - defer ticker.Stop() - - t.scrape(sampleAppender) - - // Explanation of the contraption below: - // - // In case t.scraperStopping has something to receive, we want to read - // from that channel rather than starting a new scrape (which might take very - // long). That's why the outer select has no ticker.C. Should t.scraperStopping - // not have anything to receive, we go into the inner select, where ticker.C - // is in the mix. - for { - select { - case <-t.scraperStopping: - return - default: - select { - case <-t.scraperStopping: - return - case <-ticker.C: - took := time.Since(t.status.LastScrape()) - - intervalStr := lastScrapeInterval.String() - - // On changed scrape interval the new interval becomes effective - // after the next scrape. - if iv := t.interval(); iv != lastScrapeInterval { - ticker.Stop() - ticker = time.NewTicker(iv) - lastScrapeInterval = iv - } - - targetIntervalLength.WithLabelValues(intervalStr).Observe( - float64(took) / float64(time.Second), // Sub-second precision. - ) - if sampleAppender.NeedsThrottling() { - targetSkippedScrapes.WithLabelValues(intervalStr).Inc() - t.status.setLastError(errSkippedScrape) - continue - } - t.scrape(sampleAppender) - } - } - } -} - -// StopScraper implements Target. -func (t *Target) StopScraper() { - log.Debugf("Stopping scraper for target %v...", t) - - close(t.scraperStopping) - <-t.scraperStopped - - log.Debugf("Scraper for target %v stopped.", t) -} - -const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` - -func (t *Target) scrape(appender storage.SampleAppender) error { - var ( - err error - start = time.Now() - ) - defer func(appender storage.SampleAppender) { - t.report(appender, start, time.Since(start), err) - }(appender) - - t.RLock() - - appender = t.wrapAppender(appender) - - client := t.httpClient - t.RUnlock() - - req, err := http.NewRequest("GET", t.URL().String(), nil) - if err != nil { - return err - } - req.Header.Add("Accept", acceptHeader) - - ctx, _ := context.WithTimeout(context.Background(), t.timeout()) - resp, err := ctxhttp.Do(ctx, client, req) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("server returned HTTP status %s", resp.Status) - } - - dec := expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)) - - sdec := expfmt.SampleDecoder{ - Dec: dec, - Opts: &expfmt.DecodeOptions{ - Timestamp: model.TimeFromUnixNano(start.UnixNano()), - }, - } - - var ( - samples model.Vector - numOutOfOrder int - logger = log.With("target", t.InstanceIdentifier()) - ) - for { - if err = sdec.Decode(&samples); err != nil { - break - } - for _, s := range samples { - err := appender.Append(s) - if err != nil { - if err == local.ErrOutOfOrderSample { - numOutOfOrder++ - } else { - logger.With("sample", s).Warnf("Error inserting sample: %s", err) - } - } - - } - } - if numOutOfOrder > 0 { - logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") - } - - if err == io.EOF { - // Set err to nil since it is used in the scrape health recording. - err = nil - } - return err -} - -func (t *Target) report(app storage.SampleAppender, start time.Time, duration time.Duration, err error) { - t.status.setLastScrape(start) - t.status.setLastError(err) - - ts := model.TimeFromUnixNano(start.UnixNano()) - - var health model.SampleValue if err == nil { - health = 1 + t.health = HealthGood + } else { + t.health = HealthBad } - healthSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - }, - Timestamp: ts, - Value: health, - } - durationSample := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - }, - Timestamp: ts, - Value: model.SampleValue(float64(duration) / float64(time.Second)), - } - - app = t.wrapReportingAppender(app) - - app.Append(healthSample) - app.Append(durationSample) + t.lastError = err + t.lastScrape = start } +// LastError returns the error encountered during the last scrape. +func (t *Target) LastError() error { + t.mtx.RLock() + defer t.mtx.RUnlock() + + return t.lastError +} + +// LastScrape returns the time of the last scrape. +func (t *Target) LastScrape() time.Time { + t.mtx.RLock() + defer t.mtx.RUnlock() + + return t.lastScrape +} + +// Health returns the last known health state of the target. +func (t *Target) Health() TargetHealth { + t.mtx.RLock() + defer t.mtx.RUnlock() + + return t.health +} + +// Targets is a sortable list of targets. +type Targets []*Target + +func (ts Targets) Len() int { return len(ts) } +func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() } +func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } + // Merges the ingested sample's metric with the label set. On a collision the // value of the ingested label is stored in a label prefixed with 'exported_'. type ruleLabelsAppender struct { @@ -643,36 +284,3 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } - -// Labels returns a copy of the set of all public labels of the target. -func (t *Target) Labels() model.LabelSet { - t.RLock() - defer t.RUnlock() - - return t.unlockedLabels() -} - -// unlockedLabels does the same as Labels but does not lock the mutex (useful -// for internal usage when the mutex is already locked). -func (t *Target) unlockedLabels() model.LabelSet { - lset := make(model.LabelSet, len(t.labels)) - for ln, lv := range t.labels { - if !strings.HasPrefix(string(ln), model.ReservedLabelPrefix) { - lset[ln] = lv - } - } - - if _, ok := lset[model.InstanceLabel]; !ok { - lset[model.InstanceLabel] = t.labels[model.AddressLabel] - } - - return lset -} - -// MetaLabels returns a copy of the target's labels before any processing. -func (t *Target) MetaLabels() model.LabelSet { - t.RLock() - defer t.RUnlock() - - return t.metaLabels.Clone() -} diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 5fc72423b..851dc5403 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -16,7 +16,6 @@ package retrieval import ( "crypto/tls" "crypto/x509" - "errors" "fmt" "io/ioutil" "net/http" @@ -35,9 +34,8 @@ import ( func TestTargetLabels(t *testing.T) { target := newTestTarget("example.com:80", 0, model.LabelSet{"job": "some_job", "foo": "bar"}) want := model.LabelSet{ - model.JobLabel: "some_job", - model.InstanceLabel: "example.com:80", - "foo": "bar", + model.JobLabel: "some_job", + "foo": "bar", } got := target.Labels() if !reflect.DeepEqual(want, got) { @@ -91,484 +89,36 @@ func TestTargetOffset(t *testing.T) { } } -func TestTargetWrapReportingAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, +func TestTargetURL(t *testing.T) { + params := url.Values{ + "abc": []string{"foo", "bar", "baz"}, + "xyz": []string{"hoo"}, + } + labels := model.LabelSet{ + model.AddressLabel: "example.com:1234", + model.SchemeLabel: "https", + model.MetricsPathLabel: "/metricz", + "__param_abc": "overwrite", + "__param_cde": "huu", + } + target := NewTarget(labels, labels, params) + + // The reserved labels are concatenated into a full URL. The first value for each + // URL query parameter can be set/modified via labels as well. + expectedParams := url.Values{ + "abc": []string{"overwrite", "bar", "baz"}, + "cde": []string{"huu"}, + "xyz": []string{"hoo"}, + } + expectedURL := url.URL{ + Scheme: "https", + Host: "example.com:1234", + Path: "/metricz", + RawQuery: expectedParams.Encode(), } - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - target.scrapeConfig = cfg - app := &nopAppender{} - - cfg.HonorLabels = false - wrapped := target.wrapReportingAppender(app) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if rl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", rl.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = target.wrapReportingAppender(app) - - hl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - if hl.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", hl.SampleAppender) - } -} - -func TestTargetWrapAppender(t *testing.T) { - cfg := &config.ScrapeConfig{ - MetricRelabelConfigs: []*config.RelabelConfig{ - {}, {}, {}, - }, - } - - target := newTestTarget("example.com:80", 10*time.Millisecond, nil) - target.scrapeConfig = cfg - app := &nopAppender{} - - cfg.HonorLabels = false - wrapped := target.wrapAppender(app) - - rl, ok := wrapped.(ruleLabelsAppender) - if !ok { - t.Fatalf("Expected ruleLabelsAppender but got %T", wrapped) - } - re, ok := rl.SampleAppender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", rl.SampleAppender) - } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) - } - - cfg.HonorLabels = true - wrapped = target.wrapAppender(app) - - hl, ok := wrapped.(honorLabelsAppender) - if !ok { - t.Fatalf("Expected honorLabelsAppender but got %T", wrapped) - } - re, ok = hl.SampleAppender.(relabelAppender) - if !ok { - t.Fatalf("Expected relabelAppender but got %T", hl.SampleAppender) - } - if re.SampleAppender != app { - t.Fatalf("Expected base appender but got %T", re.SampleAppender) - } -} - -func TestOverwriteLabels(t *testing.T) { - type test struct { - metric string - resultNormal model.Metric - resultHonor model.Metric - } - var tests []test - - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for _, test := range tests { - w.Write([]byte(test.metric)) - w.Write([]byte(" 1\n")) - } - }, - ), - ) - defer server.Close() - addr := model.LabelValue(strings.Split(server.URL, "://")[1]) - - tests = []test{ - { - metric: `foo{}`, - resultNormal: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - }, - resultHonor: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - }, - }, - { - metric: `foo{instance=""}`, - resultNormal: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - }, - resultHonor: model.Metric{ - model.MetricNameLabel: "foo", - }, - }, - { - metric: `foo{instance="other_instance"}`, - resultNormal: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: addr, - model.ExportedLabelPrefix + model.InstanceLabel: "other_instance", - }, - resultHonor: model.Metric{ - model.MetricNameLabel: "foo", - model.InstanceLabel: "other_instance", - }, - }, - } - - target := newTestTarget(server.URL, time.Second, nil) - - target.scrapeConfig.HonorLabels = false - app := &collectResultAppender{} - if err := target.scrape(app); err != nil { - t.Fatal(err) - } - - for i, test := range tests { - if !reflect.DeepEqual(app.result[i].Metric, test.resultNormal) { - t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultNormal, app.result[i].Metric) - } - } - - target.scrapeConfig.HonorLabels = true - app = &collectResultAppender{} - if err := target.scrape(app); err != nil { - t.Fatal(err) - } - - for i, test := range tests { - if !reflect.DeepEqual(app.result[i].Metric, test.resultHonor) { - t.Errorf("Error comparing %q:\nExpected:\n%s\nGot:\n%s\n", test.metric, test.resultHonor, app.result[i].Metric) - } - - } -} -func TestTargetScrapeUpdatesState(t *testing.T) { - testTarget := newTestTarget("bad schema", 0, nil) - - testTarget.scrape(nopAppender{}) - if testTarget.status.Health() != HealthBad { - t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) - } -} - -func TestTargetScrapeWithThrottledStorage(t *testing.T) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - for i := 0; i < 10; i++ { - w.Write([]byte( - fmt.Sprintf("test_metric_%d{foo=\"bar\"} 123.456\n", i), - )) - } - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - - go testTarget.RunScraper(&collectResultAppender{throttled: true}) - - // Enough time for a scrape to happen. - time.Sleep(20 * time.Millisecond) - - testTarget.StopScraper() - // Wait for it to take effect. - time.Sleep(20 * time.Millisecond) - - if testTarget.status.Health() != HealthBad { - t.Errorf("Expected target state %v, actual: %v", HealthBad, testTarget.status.Health()) - } - if testTarget.status.LastError() != errSkippedScrape { - t.Errorf("Expected target error %q, actual: %q", errSkippedScrape, testTarget.status.LastError()) - } -} - -func TestTargetScrapeMetricRelabelConfigs(t *testing.T) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte("test_metric_drop 0\n")) - w.Write([]byte("test_metric_relabel 1\n")) - }, - ), - ) - defer server.Close() - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{}) - testTarget.scrapeConfig.MetricRelabelConfigs = []*config.RelabelConfig{ - { - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp(".*drop.*"), - Action: config.RelabelDrop, - }, - { - SourceLabels: model.LabelNames{"__name__"}, - Regex: config.MustNewRegexp(".*(relabel|up).*"), - TargetLabel: "foo", - Replacement: "bar", - Action: config.RelabelReplace, - }, - } - - appender := &collectResultAppender{} - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } - - // Remove variables part of result. - for _, sample := range appender.result { - sample.Timestamp = 0 - sample.Value = 0 - } - - expected := []*model.Sample{ - { - Metric: model.Metric{ - model.MetricNameLabel: "test_metric_relabel", - "foo": "bar", - model.InstanceLabel: model.LabelValue(testTarget.host()), - }, - Timestamp: 0, - Value: 0, - }, - // The metrics about the scrape are not affected. - { - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - model.InstanceLabel: model.LabelValue(testTarget.host()), - }, - Timestamp: 0, - Value: 0, - }, - { - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - model.InstanceLabel: model.LabelValue(testTarget.host()), - }, - Timestamp: 0, - Value: 0, - }, - } - - if !appender.result.Equal(expected) { - t.Fatalf("Expected and actual samples not equal. Expected: %s, actual: %s", expected, appender.result) - } - -} - -func TestTargetRecordScrapeHealth(t *testing.T) { - var ( - testTarget = newTestTarget("example.url:80", 0, model.LabelSet{model.JobLabel: "testjob"}) - now = model.Now() - appender = &collectResultAppender{} - ) - - testTarget.report(appender, now.Time(), 2*time.Second, nil) - - result := appender.result - - if len(result) != 2 { - t.Fatalf("Expected two samples, got %d", len(result)) - } - - actual := result[0] - expected := &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeHealthMetricName, - model.InstanceLabel: "example.url:80", - model.JobLabel: "testjob", - }, - Timestamp: now, - Value: 1, - } - - if !actual.Equal(expected) { - t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual) - } - - actual = result[1] - expected = &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: scrapeDurationMetricName, - model.InstanceLabel: "example.url:80", - model.JobLabel: "testjob", - }, - Timestamp: now, - Value: 2.0, - } - - if !actual.Equal(expected) { - t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual) - } -} - -func TestTargetScrapeTimeout(t *testing.T) { - signal := make(chan bool, 1) - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - <-signal - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte{}) - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, 50*time.Millisecond, model.LabelSet{}) - - appender := nopAppender{} - - // scrape once without timeout - signal <- true - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } - - // let the deadline lapse - time.Sleep(55 * time.Millisecond) - - // now scrape again - signal <- true - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } - - // now timeout - if err := testTarget.scrape(appender); err == nil { - t.Fatal("expected scrape to timeout") - } else { - signal <- true // let handler continue - } - - // now scrape again without timeout - signal <- true - if err := testTarget.scrape(appender); err != nil { - t.Fatal(err) - } -} - -func TestTargetScrape404(t *testing.T) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{}) - appender := nopAppender{} - - want := errors.New("server returned HTTP status 404 Not Found") - got := testTarget.scrape(appender) - if got == nil || want.Error() != got.Error() { - t.Fatalf("want err %q, got %q", want, got) - } -} - -func TestTargetRunScraperScrapes(t *testing.T) { - testTarget := newTestTarget("bad schema", 0, nil) - - go testTarget.RunScraper(nopAppender{}) - - // Enough time for a scrape to happen. - time.Sleep(20 * time.Millisecond) - if testTarget.status.LastScrape().IsZero() { - t.Errorf("Scrape hasn't occured.") - } - - testTarget.StopScraper() - // Wait for it to take effect. - time.Sleep(20 * time.Millisecond) - last := testTarget.status.LastScrape() - // Enough time for a scrape to happen. - time.Sleep(20 * time.Millisecond) - if testTarget.status.LastScrape() != last { - t.Errorf("Scrape occured after it was stopped.") - } -} - -func BenchmarkScrape(b *testing.B) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte("test_metric{foo=\"bar\"} 123.456\n")) - }, - ), - ) - defer server.Close() - - testTarget := newTestTarget(server.URL, time.Second, model.LabelSet{"dings": "bums"}) - appender := nopAppender{} - - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := testTarget.scrape(appender); err != nil { - b.Fatal(err) - } - } -} - -func TestURLParams(t *testing.T) { - server := httptest.NewServer( - http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", `text/plain; version=0.0.4`) - w.Write([]byte{}) - r.ParseForm() - if r.Form["foo"][0] != "bar" { - t.Fatalf("URL parameter 'foo' had unexpected first value '%v'", r.Form["foo"][0]) - } - if r.Form["foo"][1] != "baz" { - t.Fatalf("URL parameter 'foo' had unexpected second value '%v'", r.Form["foo"][1]) - } - }, - ), - ) - defer server.Close() - serverURL, err := url.Parse(server.URL) - if err != nil { - t.Fatal(err) - } - - target, err := NewTarget( - &config.ScrapeConfig{ - JobName: "test_job1", - ScrapeInterval: model.Duration(1 * time.Minute), - ScrapeTimeout: model.Duration(1 * time.Second), - Scheme: serverURL.Scheme, - Params: url.Values{ - "foo": []string{"bar", "baz"}, - }, - }, - model.LabelSet{ - model.SchemeLabel: model.LabelValue(serverURL.Scheme), - model.AddressLabel: model.LabelValue(serverURL.Host), - "__param_foo": "bar", - }, - nil, - ) - if err != nil { - t.Fatal(err) - } - app := &collectResultAppender{} - if err = target.scrape(app); err != nil { - t.Fatal(err) + if u := target.URL(); !reflect.DeepEqual(u.String(), expectedURL.String()) { + t.Fatalf("Expected URL %q but got %q", expectedURL, u) } } @@ -578,23 +128,9 @@ func newTestTarget(targetURL string, deadline time.Duration, labels model.LabelS labels[model.AddressLabel] = model.LabelValue(strings.TrimLeft(targetURL, "http://")) labels[model.MetricsPathLabel] = "/metrics" - t := &Target{ - scrapeConfig: &config.ScrapeConfig{ - ScrapeInterval: model.Duration(time.Millisecond), - ScrapeTimeout: model.Duration(deadline), - }, - labels: labels, - status: &TargetStatus{}, - scraperStopping: make(chan struct{}), - scraperStopped: make(chan struct{}), + return &Target{ + labels: labels, } - - var err error - if t.httpClient, err = t.client(); err != nil { - panic(err) - } - - return t } func TestNewHTTPBearerToken(t *testing.T) { @@ -766,7 +302,7 @@ func newTLSConfig(t *testing.T) *tls.Config { return tlsConfig } -func TestNewTargetWithBadTLSConfig(t *testing.T) { +func TestNewClientWithBadTLSConfig(t *testing.T) { cfg := &config.ScrapeConfig{ ScrapeTimeout: model.Duration(1 * time.Second), TLSConfig: config.TLSConfig{ @@ -775,7 +311,7 @@ func TestNewTargetWithBadTLSConfig(t *testing.T) { KeyFile: "testdata/nonexistent_client.key", }, } - _, err := NewTarget(cfg, nil, nil) + _, err := newHTTPClient(cfg) if err == nil { t.Fatalf("Expected error, got nil.") } diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 6f06438b4..6daa708fc 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -17,9 +17,11 @@ import ( "fmt" "strings" "sync" + "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/retrieval/discovery" @@ -33,285 +35,102 @@ import ( // The TargetProvider does not have to guarantee that an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // -// Sources() is guaranteed to be called exactly once before each call to Run(). -// On a call to Run() implementing types must send a valid target group for each of -// the sources they declared in the last call to Sources(). +// Providers must initially send all known target groups as soon as it can. type TargetProvider interface { - // Sources returns the source identifiers the provider is currently aware of. - Sources() []string // Run hands a channel to the target provider through which it can send // updated target groups. The channel must be closed by the target provider // if no more updates will be sent. // On receiving from done Run must return. - Run(up chan<- config.TargetGroup, done <-chan struct{}) + Run(ctx context.Context, up chan<- []*config.TargetGroup) } // TargetManager maintains a set of targets, starts and stops their scraping and // creates the new targets based on the target groups it receives from various // target providers. type TargetManager struct { - mtx sync.RWMutex - sampleAppender storage.SampleAppender - running bool - done chan struct{} + appender storage.SampleAppender + scrapeConfigs []*config.ScrapeConfig - // Targets by their source ID. - targets map[string][]*Target - // Providers by the scrape configs they are derived from. - providers map[*config.ScrapeConfig][]TargetProvider + mtx sync.RWMutex + ctx context.Context + cancel func() + wg sync.WaitGroup + + // Set of unqiue targets by scrape configuration. + targetSets map[string]*targetSet } // NewTargetManager creates a new TargetManager. -func NewTargetManager(sampleAppender storage.SampleAppender) *TargetManager { - tm := &TargetManager{ - sampleAppender: sampleAppender, - targets: map[string][]*Target{}, +func NewTargetManager(app storage.SampleAppender) *TargetManager { + return &TargetManager{ + appender: app, + targetSets: map[string]*targetSet{}, } - return tm -} - -// merge multiple target group channels into a single output channel. -func merge(done <-chan struct{}, cs ...<-chan targetGroupUpdate) <-chan targetGroupUpdate { - var wg sync.WaitGroup - out := make(chan targetGroupUpdate) - - // Start an output goroutine for each input channel in cs. output - // copies values from c to out until c or done is closed, then calls - // wg.Done. - redir := func(c <-chan targetGroupUpdate) { - defer wg.Done() - for n := range c { - select { - case out <- n: - case <-done: - return - } - } - } - - wg.Add(len(cs)) - for _, c := range cs { - go redir(c) - } - - // Close the out channel if all inbound channels are closed. - go func() { - wg.Wait() - close(out) - }() - return out -} - -// targetGroupUpdate is a potentially changed/new target group -// for the given scrape configuration. -type targetGroupUpdate struct { - tg config.TargetGroup - scfg *config.ScrapeConfig } // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { log.Info("Starting target manager...") - tm.done = make(chan struct{}) - - sources := map[string]struct{}{} - updates := []<-chan targetGroupUpdate{} - - for scfg, provs := range tm.providers { - for _, prov := range provs { - // Get an initial set of available sources so we don't remove - // target groups from the last run that are still available. - for _, src := range prov.Sources() { - sources[src] = struct{}{} - } - - tgc := make(chan config.TargetGroup) - // Run the target provider after cleanup of the stale targets is done. - defer func(prov TargetProvider, tgc chan<- config.TargetGroup, done <-chan struct{}) { - go prov.Run(tgc, done) - }(prov, tgc, tm.done) - - tgupc := make(chan targetGroupUpdate) - updates = append(updates, tgupc) - - go func(scfg *config.ScrapeConfig, done <-chan struct{}) { - defer close(tgupc) - for { - select { - case tg := <-tgc: - tgupc <- targetGroupUpdate{tg: tg, scfg: scfg} - case <-done: - return - } - } - }(scfg, tm.done) - } - } - - // Merge all channels of incoming target group updates into a single - // one and keep applying the updates. - go tm.handleUpdates(merge(tm.done, updates...), tm.done) - tm.mtx.Lock() - defer tm.mtx.Unlock() - // Remove old target groups that are no longer in the set of sources. - tm.removeTargets(func(src string) bool { - if _, ok := sources[src]; ok { - return false - } - return true - }) + tm.ctx, tm.cancel = context.WithCancel(context.Background()) + tm.reload() - tm.running = true - log.Info("Target manager started.") -} + tm.mtx.Unlock() -// handleUpdates receives target group updates and handles them in the -// context of the given job config. -func (tm *TargetManager) handleUpdates(ch <-chan targetGroupUpdate, done <-chan struct{}) { - for { - select { - case update, ok := <-ch: - if !ok { - return - } - log.Debugf("Received potential update for target group %q", update.tg.Source) - - if err := tm.updateTargetGroup(&update.tg, update.scfg); err != nil { - log.Errorf("Error updating targets: %s", err) - } - case <-done: - return - } - } + tm.wg.Wait() } // Stop all background processing. func (tm *TargetManager) Stop() { - tm.mtx.RLock() - if tm.running { - defer tm.stop(true) - } - // Return the lock before calling tm.stop(). - defer tm.mtx.RUnlock() -} - -// stop background processing of the target manager. If removeTargets is true, -// existing targets will be stopped and removed. -func (tm *TargetManager) stop(removeTargets bool) { - log.Info("Stopping target manager...") - defer log.Info("Target manager stopped.") - - close(tm.done) + log.Infoln("Stopping target manager...") tm.mtx.Lock() - defer tm.mtx.Unlock() + // Cancel the base context, this will cause all target providers to shut down + // and all in-flight scrapes to abort immmediately. + // Started inserts will be finished before terminating. + tm.cancel() + tm.mtx.Unlock() - if removeTargets { - tm.removeTargets(nil) - } + // Wait for all scrape inserts to complete. + tm.wg.Wait() - tm.running = false + log.Debugln("Target manager stopped") } -// removeTargets stops and removes targets for sources where f(source) is true -// or if f is nil. This method is not thread-safe. -func (tm *TargetManager) removeTargets(f func(string) bool) { - if f == nil { - f = func(string) bool { return true } - } - var wg sync.WaitGroup - for src, targets := range tm.targets { - if !f(src) { - continue - } - wg.Add(len(targets)) - for _, target := range targets { - go func(t *Target) { - t.StopScraper() - wg.Done() - }(target) - } - delete(tm.targets, src) - } - wg.Wait() -} +func (tm *TargetManager) reload() { + jobs := map[string]struct{}{} -// updateTargetGroup creates new targets for the group and replaces the old targets -// for the source ID. -func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *config.ScrapeConfig) error { - newTargets, err := tm.targetsFromGroup(tgroup, cfg) - if err != nil { - return err + // Start new target sets and update existing ones. + for _, scfg := range tm.scrapeConfigs { + jobs[scfg.JobName] = struct{}{} + + ts, ok := tm.targetSets[scfg.JobName] + if !ok { + ts = newTargetSet(scfg, tm.appender) + tm.targetSets[scfg.JobName] = ts + + tm.wg.Add(1) + + go func(ts *targetSet) { + ts.runScraping(tm.ctx) + tm.wg.Done() + }(ts) + } else { + ts.reload(scfg) + } + ts.runProviders(tm.ctx, providersFromConfig(scfg)) } - tm.mtx.Lock() - defer tm.mtx.Unlock() - - if !tm.running { - return nil - } - - oldTargets, ok := tm.targets[tgroup.Source] - if ok { - var wg sync.WaitGroup - // Replace the old targets with the new ones while keeping the state - // of intersecting targets. - for i, tnew := range newTargets { - var match *Target - for j, told := range oldTargets { - if told == nil { - continue - } - if tnew.fingerprint() == told.fingerprint() { - match = told - oldTargets[j] = nil - break - } - } - // Update the existing target and discard the new equivalent. - // Otherwise start scraping the new target. - if match != nil { - // Updating is blocked during a scrape. We don't want those wait times - // to build up. - wg.Add(1) - go func(t *Target) { - if err := match.Update(cfg, t.labels, t.metaLabels); err != nil { - log.Errorf("Error updating target %v: %v", t, err) - } - wg.Done() - }(tnew) - newTargets[i] = match - } else { - go tnew.RunScraper(tm.sampleAppender) - } - } - // Remove all old targets that disappeared. - for _, told := range oldTargets { - if told != nil { - wg.Add(1) - go func(t *Target) { - t.StopScraper() - wg.Done() - }(told) - } - } - wg.Wait() - } else { - // The source ID is new, start all target scrapers. - for _, tnew := range newTargets { - go tnew.RunScraper(tm.sampleAppender) + // Remove old target sets. Waiting for stopping is already guaranteed + // by the goroutine that started the target set. + for name, ts := range tm.targetSets { + if _, ok := jobs[name]; !ok { + ts.cancel() + delete(tm.targetSets, name) } } - - if len(newTargets) > 0 { - tm.targets[tgroup.Source] = newTargets - } else { - delete(tm.targets, tgroup.Source) - } - return nil } // Pools returns the targets currently being scraped bucketed by their job name. @@ -321,11 +140,16 @@ func (tm *TargetManager) Pools() map[string][]*Target { pools := map[string][]*Target{} - for _, ts := range tm.targets { - for _, t := range ts { + // TODO(fabxc): this is just a hack to maintain compatibility for now. + for _, ps := range tm.targetSets { + ps.scrapePool.mtx.RLock() + + for _, t := range ps.scrapePool.targets { job := string(t.Labels()[model.JobLabel]) pools[job] = append(pools[job], t) } + + ps.scrapePool.mtx.RUnlock() } return pools } @@ -334,79 +158,196 @@ func (tm *TargetManager) Pools() map[string][]*Target { // by the new cfg. The state of targets that are valid in the new configuration remains unchanged. // Returns true on success. func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { - tm.mtx.RLock() - running := tm.running - tm.mtx.RUnlock() - - if running { - tm.stop(false) - // Even if updating the config failed, we want to continue rather than stop scraping anything. - defer tm.Run() - } - providers := map[*config.ScrapeConfig][]TargetProvider{} - - for _, scfg := range cfg.ScrapeConfigs { - providers[scfg] = providersFromConfig(scfg) - } - tm.mtx.Lock() defer tm.mtx.Unlock() - tm.providers = providers + tm.scrapeConfigs = cfg.ScrapeConfigs + + if tm.ctx != nil { + tm.reload() + } return true } -// prefixedTargetProvider wraps TargetProvider and prefixes source strings -// to make the sources unique across a configuration. -type prefixedTargetProvider struct { - TargetProvider +// targetSet holds several TargetProviders for which the same scrape configuration +// is used. It maintains target groups from all given providers and sync them +// to a scrape pool. +type targetSet struct { + mtx sync.RWMutex - job string - mechanism string - idx int + // Sets of targets by a source string that is unique across target providers. + tgroups map[string][]*Target + providers map[string]TargetProvider + + scrapePool *scrapePool + config *config.ScrapeConfig + + syncCh chan struct{} + cancelScraping func() + cancelProviders func() } -func (tp *prefixedTargetProvider) prefix(src string) string { - return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src) -} - -func (tp *prefixedTargetProvider) Sources() []string { - srcs := tp.TargetProvider.Sources() - for i, src := range srcs { - srcs[i] = tp.prefix(src) +func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { + ts := &targetSet{ + tgroups: map[string][]*Target{}, + scrapePool: newScrapePool(cfg, app), + syncCh: make(chan struct{}, 1), + config: cfg, } - - return srcs + return ts } -func (tp *prefixedTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { - defer close(ch) +func (ts *targetSet) cancel() { + ts.mtx.RLock() + defer ts.mtx.RUnlock() - ch2 := make(chan config.TargetGroup) - go tp.TargetProvider.Run(ch2, done) + if ts.cancelScraping != nil { + ts.cancelScraping() + } + if ts.cancelProviders != nil { + ts.cancelProviders() + } +} +func (ts *targetSet) reload(cfg *config.ScrapeConfig) { + ts.mtx.Lock() + ts.config = cfg + ts.mtx.Unlock() + + ts.scrapePool.reload(cfg) +} + +func (ts *targetSet) runScraping(ctx context.Context) { + ctx, ts.cancelScraping = context.WithCancel(ctx) + + ts.scrapePool.ctx = ctx + +Loop: for { + // Throttle syncing to once per five seconds. select { - case <-done: - return - case tg := <-ch2: - tg.Source = tp.prefix(tg.Source) - ch <- tg + case <-ctx.Done(): + break Loop + case <-time.After(5 * time.Second): + } + + select { + case <-ctx.Done(): + break Loop + case <-ts.syncCh: + ts.mtx.RLock() + ts.sync() + ts.mtx.RUnlock() } } + + // We want to wait for all pending target scrapes to complete though to ensure there'll + // be no more storage writes after this point. + ts.scrapePool.stop() +} + +func (ts *targetSet) sync() { + var all []*Target + for _, targets := range ts.tgroups { + all = append(all, targets...) + } + ts.scrapePool.sync(all) +} + +func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) { + // Lock for the entire time. This may mean up to 5 seconds until the full initial set + // is retrieved and applied. + // We could release earlier with some tweaks, but this is easier to reason about. + ts.mtx.Lock() + defer ts.mtx.Unlock() + + var wg sync.WaitGroup + + if ts.cancelProviders != nil { + ts.cancelProviders() + } + ctx, ts.cancelProviders = context.WithCancel(ctx) + + for name, prov := range providers { + wg.Add(1) + + updates := make(chan []*config.TargetGroup) + + go func(name string, prov TargetProvider) { + var initial []*config.TargetGroup + + select { + case <-ctx.Done(): + wg.Done() + return + case initial = <-updates: + // First set of all targets the provider knows. + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + + for _, tgroup := range initial { + targets, err := targetsFromGroup(tgroup, ts.config) + if err != nil { + log.With("target_group", tgroup).Errorf("Target update failed: %s", err) + continue + } + ts.tgroups[name+"/"+tgroup.Source] = targets + } + + wg.Done() + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs := <-updates: + for _, tg := range tgs { + if err := ts.update(name, tg); err != nil { + log.With("target_group", tg).Errorf("Target update failed: %s", err) + } + } + } + } + }(name, prov) + + go prov.Run(ctx, updates) + } + + // We wait for a full initial set of target groups before releasing the mutex + // to ensure the initial sync is complete and there are no races with subsequent updates. + wg.Wait() + ts.sync() +} + +// update handles a target group update from a target provider identified by the name. +func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { + targets, err := targetsFromGroup(tgroup, ts.config) + if err != nil { + return err + } + + ts.mtx.Lock() + defer ts.mtx.Unlock() + + ts.tgroups[name+"/"+tgroup.Source] = targets + + select { + case ts.syncCh <- struct{}{}: + default: + } + + return nil } // providersFromConfig returns all TargetProviders configured in cfg. -func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { - var providers []TargetProvider +func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider { + providers := map[string]TargetProvider{} app := func(mech string, i int, tp TargetProvider) { - providers = append(providers, &prefixedTargetProvider{ - job: cfg.JobName, - mechanism: mech, - idx: i, - TargetProvider: tp, - }) + providers[fmt.Sprintf("%s/%d", mech, i)] = tp } for i, c := range cfg.DNSSDConfigs { @@ -451,11 +392,9 @@ func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { } // targetsFromGroup builds targets based on the given TargetGroup and config. -func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - +func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { targets := make([]*Target, 0, len(tg.Targets)) + for i, labels := range tg.Targets { for k, v := range cfg.Params { if len(v) > 0 { @@ -518,11 +457,12 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc delete(labels, ln) } } - tr, err := NewTarget(cfg, labels, preRelabelLabels) - if err != nil { - return nil, fmt.Errorf("error while creating instance %d in target group %s: %s", i, tg, err) + + if _, ok := labels[model.InstanceLabel]; !ok { + labels[model.InstanceLabel] = labels[model.AddressLabel] } - targets = append(targets, tr) + + targets = append(targets, NewTarget(labels, preRelabelLabels, cfg.Params)) } return targets, nil @@ -539,29 +479,16 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { for i, tg := range groups { tg.Source = fmt.Sprintf("%d", i) } - return &StaticProvider{ - TargetGroups: groups, - } + return &StaticProvider{groups} } // Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) { - defer close(ch) - - for _, tg := range sd.TargetGroups { - select { - case <-done: - return - case ch <- *tg: - } +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): } - <-done -} - -// Sources returns the provider's sources. -func (sd *StaticProvider) Sources() (srcs []string) { - for _, tg := range sd.TargetGroups { - srcs = append(srcs, tg.Source) - } - return srcs + close(ch) } diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 2e7d95db2..074fe51ec 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -12,492 +12,3 @@ // limitations under the License. package retrieval - -import ( - "net/url" - "reflect" - "testing" - "time" - - "github.com/prometheus/common/model" - - "github.com/prometheus/prometheus/config" -) - -func TestPrefixedTargetProvider(t *testing.T) { - targetGroups := []*config.TargetGroup{ - { - Targets: []model.LabelSet{ - {model.AddressLabel: "test-1:1234"}, - }, - }, { - Targets: []model.LabelSet{ - {model.AddressLabel: "test-1:1235"}, - }, - }, - } - - tp := &prefixedTargetProvider{ - job: "job-x", - mechanism: "static", - idx: 123, - TargetProvider: NewStaticProvider(targetGroups), - } - - expSources := []string{ - "job-x:static:123:0", - "job-x:static:123:1", - } - if !reflect.DeepEqual(tp.Sources(), expSources) { - t.Fatalf("expected sources %v, got %v", expSources, tp.Sources()) - } - - ch := make(chan config.TargetGroup) - done := make(chan struct{}) - - defer close(done) - go tp.Run(ch, done) - - expGroup1 := *targetGroups[0] - expGroup2 := *targetGroups[1] - expGroup1.Source = "job-x:static:123:0" - expGroup2.Source = "job-x:static:123:1" - - // The static target provider sends on the channel once per target group. - if tg := <-ch; !reflect.DeepEqual(tg, expGroup1) { - t.Fatalf("expected target group %v, got %v", expGroup1, tg) - } - if tg := <-ch; !reflect.DeepEqual(tg, expGroup2) { - t.Fatalf("expected target group %v, got %v", expGroup2, tg) - } -} - -func TestTargetManagerChan(t *testing.T) { - testJob1 := &config.ScrapeConfig{ - JobName: "test_job1", - ScrapeInterval: model.Duration(1 * time.Minute), - TargetGroups: []*config.TargetGroup{{ - Targets: []model.LabelSet{ - {model.AddressLabel: "example.org:80"}, - {model.AddressLabel: "example.com:80"}, - }, - }}, - } - prov1 := &fakeTargetProvider{ - sources: []string{"src1", "src2"}, - update: make(chan *config.TargetGroup), - } - - targetManager := &TargetManager{ - sampleAppender: nopAppender{}, - providers: map[*config.ScrapeConfig][]TargetProvider{ - testJob1: {prov1}, - }, - targets: make(map[string][]*Target), - } - go targetManager.Run() - defer targetManager.Stop() - - sequence := []struct { - tgroup *config.TargetGroup - expected map[string][]model.LabelSet - }{ - { - tgroup: &config.TargetGroup{ - Source: "src1", - Targets: []model.LabelSet{ - {model.AddressLabel: "test-1:1234"}, - {model.AddressLabel: "test-2:1234", "label": "set"}, - {model.AddressLabel: "test-3:1234"}, - }, - }, - expected: map[string][]model.LabelSet{ - "src1": { - {model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1234", "label": "set"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"}, - }, - }, - }, { - tgroup: &config.TargetGroup{ - Source: "src2", - Targets: []model.LabelSet{ - {model.AddressLabel: "test-1:1235"}, - {model.AddressLabel: "test-2:1235"}, - {model.AddressLabel: "test-3:1235"}, - }, - Labels: model.LabelSet{"group": "label"}, - }, - expected: map[string][]model.LabelSet{ - "src1": { - {model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1234", "label": "set"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"}, - }, - "src2": { - {model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1235", "group": "label"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1235", "group": "label"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1235", "group": "label"}, - }, - }, - }, { - tgroup: &config.TargetGroup{ - Source: "src2", - Targets: []model.LabelSet{}, - }, - expected: map[string][]model.LabelSet{ - "src1": { - {model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-2:1234", "label": "set"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"}, - }, - }, - }, { - tgroup: &config.TargetGroup{ - Source: "src1", - Targets: []model.LabelSet{ - {model.AddressLabel: "test-1:1234", "added": "label"}, - {model.AddressLabel: "test-3:1234"}, - {model.AddressLabel: "test-4:1234", "fancy": "label"}, - }, - }, - expected: map[string][]model.LabelSet{ - "src1": { - {model.JobLabel: "test_job1", model.InstanceLabel: "test-1:1234", "added": "label"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-3:1234"}, - {model.JobLabel: "test_job1", model.InstanceLabel: "test-4:1234", "fancy": "label"}, - }, - }, - }, - } - - for i, step := range sequence { - prov1.update <- step.tgroup - - time.Sleep(20 * time.Millisecond) - - if len(targetManager.targets) != len(step.expected) { - t.Fatalf("step %d: sources mismatch %v, %v", i, targetManager.targets, step.expected) - } - - for source, actTargets := range targetManager.targets { - expTargets, ok := step.expected[source] - if !ok { - t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets) - } - for _, expt := range expTargets { - found := false - for _, actt := range actTargets { - if reflect.DeepEqual(expt, actt.Labels()) { - found = true - break - } - } - if !found { - t.Errorf("step %d: expected target %v not found in actual targets", i, expt) - } - } - } - } -} - -func TestTargetManagerConfigUpdate(t *testing.T) { - testJob1 := &config.ScrapeConfig{ - JobName: "test_job1", - ScrapeInterval: model.Duration(1 * time.Minute), - Params: url.Values{ - "testParam": []string{"paramValue", "secondValue"}, - }, - TargetGroups: []*config.TargetGroup{{ - Targets: []model.LabelSet{ - {model.AddressLabel: "example.org:80"}, - {model.AddressLabel: "example.com"}, - }, - }}, - RelabelConfigs: []*config.RelabelConfig{ - { - // Copy out the URL parameter. - SourceLabels: model.LabelNames{"__param_testParam"}, - Regex: config.MustNewRegexp("(.*)"), - TargetLabel: "testParam", - Replacement: "$1", - Action: config.RelabelReplace, - }, - { - // The port number is added after relabeling, so - // this relabel rule should have no effect. - SourceLabels: model.LabelNames{model.AddressLabel}, - Regex: config.MustNewRegexp("example.com:80"), - Action: config.RelabelDrop, - }, - }, - } - testJob2 := &config.ScrapeConfig{ - JobName: "test_job2", - ScrapeInterval: model.Duration(1 * time.Minute), - TargetGroups: []*config.TargetGroup{ - { - Targets: []model.LabelSet{ - {model.AddressLabel: "example.org:8080"}, - {model.AddressLabel: "example.com:8081"}, - }, - Labels: model.LabelSet{ - "foo": "bar", - "boom": "box", - }, - }, - { - Targets: []model.LabelSet{ - {model.AddressLabel: "test.com:1234"}, - }, - }, - { - Targets: []model.LabelSet{ - {model.AddressLabel: "test.com:1235"}, - }, - Labels: model.LabelSet{"instance": "fixed"}, - }, - }, - RelabelConfigs: []*config.RelabelConfig{ - { - SourceLabels: model.LabelNames{model.AddressLabel}, - Regex: config.MustNewRegexp(`test\.(.*?):(.*)`), - Replacement: "foo.${1}:${2}", - TargetLabel: model.AddressLabel, - Action: config.RelabelReplace, - }, - { - // Add a new label for example.* targets. - SourceLabels: model.LabelNames{model.AddressLabel, "boom", "foo"}, - Regex: config.MustNewRegexp("example.*?-b([a-z-]+)r"), - TargetLabel: "new", - Replacement: "$1", - Separator: "-", - Action: config.RelabelReplace, - }, - { - // Drop an existing label. - SourceLabels: model.LabelNames{"boom"}, - Regex: config.MustNewRegexp(".*"), - TargetLabel: "boom", - Replacement: "", - Action: config.RelabelReplace, - }, - }, - } - // Test that targets without host:port addresses are dropped. - testJob3 := &config.ScrapeConfig{ - JobName: "test_job1", - ScrapeInterval: model.Duration(1 * time.Minute), - TargetGroups: []*config.TargetGroup{{ - Targets: []model.LabelSet{ - {model.AddressLabel: "example.net:80"}, - }, - }}, - RelabelConfigs: []*config.RelabelConfig{ - { - SourceLabels: model.LabelNames{model.AddressLabel}, - Regex: config.MustNewRegexp("(.*)"), - TargetLabel: "__address__", - Replacement: "http://$1", - Action: config.RelabelReplace, - }, - }, - } - - sequence := []struct { - scrapeConfigs []*config.ScrapeConfig - expected map[string][]model.LabelSet - }{ - { - scrapeConfigs: []*config.ScrapeConfig{testJob1}, - expected: map[string][]model.LabelSet{ - "test_job1:static:0:0": { - { - model.JobLabel: "test_job1", - "testParam": "paramValue", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.org:80", - model.ParamLabelPrefix + "testParam": "paramValue", - }, - { - model.JobLabel: "test_job1", - "testParam": "paramValue", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.com:80", - model.ParamLabelPrefix + "testParam": "paramValue"}, - }, - }, - }, { - scrapeConfigs: []*config.ScrapeConfig{testJob1}, - expected: map[string][]model.LabelSet{ - "test_job1:static:0:0": { - { - model.JobLabel: "test_job1", - "testParam": "paramValue", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.org:80", - model.ParamLabelPrefix + "testParam": "paramValue", - }, - { - model.JobLabel: "test_job1", - "testParam": "paramValue", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.com:80", - model.ParamLabelPrefix + "testParam": "paramValue", - }, - }, - }, - }, { - scrapeConfigs: []*config.ScrapeConfig{testJob1, testJob2}, - expected: map[string][]model.LabelSet{ - "test_job1:static:0:0": { - { - model.JobLabel: "test_job1", - "testParam": "paramValue", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.org:80", - model.ParamLabelPrefix + "testParam": "paramValue", - }, - { - model.JobLabel: "test_job1", - "testParam": "paramValue", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.com:80", - model.ParamLabelPrefix + "testParam": "paramValue", - }, - }, - "test_job2:static:0:0": { - { - model.JobLabel: "test_job2", - "foo": "bar", - "new": "ox-ba", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.org:8080", - }, - { - model.JobLabel: "test_job2", - "foo": "bar", - "new": "ox-ba", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.com:8081", - }, - }, - "test_job2:static:0:1": { - { - model.JobLabel: "test_job2", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "foo.com:1234", - }, - }, - "test_job2:static:0:2": { - { - model.JobLabel: "test_job2", - model.InstanceLabel: "fixed", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "foo.com:1235", - }, - }, - }, - }, { - scrapeConfigs: []*config.ScrapeConfig{}, - expected: map[string][]model.LabelSet{}, - }, { - scrapeConfigs: []*config.ScrapeConfig{testJob2}, - expected: map[string][]model.LabelSet{ - "test_job2:static:0:0": { - { - model.JobLabel: "test_job2", - "foo": "bar", - "new": "ox-ba", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.org:8080"}, - { - model.JobLabel: "test_job2", - "foo": "bar", - "new": "ox-ba", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "example.com:8081", - }, - }, - "test_job2:static:0:1": { - { - model.JobLabel: "test_job2", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "foo.com:1234", - }, - }, - "test_job2:static:0:2": { - { - model.JobLabel: "test_job2", - model.InstanceLabel: "fixed", - model.SchemeLabel: "", - model.MetricsPathLabel: "", - model.AddressLabel: "foo.com:1235", - }, - }, - }, - }, { - scrapeConfigs: []*config.ScrapeConfig{testJob3}, - expected: map[string][]model.LabelSet{}, - }, - } - conf := &config.Config{} - *conf = config.DefaultConfig - - targetManager := NewTargetManager(nopAppender{}) - targetManager.ApplyConfig(conf) - - targetManager.Run() - defer targetManager.Stop() - - for i, step := range sequence { - conf.ScrapeConfigs = step.scrapeConfigs - targetManager.ApplyConfig(conf) - - time.Sleep(50 * time.Millisecond) - - if len(targetManager.targets) != len(step.expected) { - t.Fatalf("step %d: sources mismatch: expected %v, got %v", i, step.expected, targetManager.targets) - } - - for source, actTargets := range targetManager.targets { - expTargets, ok := step.expected[source] - if !ok { - t.Fatalf("step %d: unexpected source %q: %v", i, source, actTargets) - } - for _, expt := range expTargets { - found := false - for _, actt := range actTargets { - if reflect.DeepEqual(expt, actt.labels) { - found = true - break - } - } - if !found { - t.Errorf("step %d: expected target %v for %q not found in actual targets", i, expt, source) - } - } - } - } -} - -func TestHandleUpdatesReturnsWhenUpdateChanIsClosed(t *testing.T) { - tm := NewTargetManager(nopAppender{}) - ch := make(chan targetGroupUpdate) - close(ch) - tm.handleUpdates(ch, make(chan struct{})) -} diff --git a/web/ui/bindata.go b/web/ui/bindata.go index 43c15e3b7..beed94aac 100644 --- a/web/ui/bindata.go +++ b/web/ui/bindata.go @@ -159,7 +159,7 @@ func webUiTemplatesGraphHtml() (*asset, error) { return a, nil } -var _webUiTemplatesStatusHtml = []byte("\x1f\x8b\x08\x00\x00\x09\x6e\x88\x00\xff\xcc\x57\xcd\x6e\xdc\x36\x10\xbe\xef\x53\xb0\x44\x8e\xd5\x2e\x10\xa0\x17\x63\x57\x07\x1b\x29\x1c\xc0\x29\xdc\xac\x7d\xe9\x25\xe0\x8a\xb3\x12\x5b\x9a\x14\x48\xca\xb5\xa1\xe8\xdd\x3b\x43\x49\x5e\xfd\x6d\x9a\x34\x68\xeb\xcb\x9a\x43\x0e\xe7\xe7\x9b\x6f\x46\x74\x5d\x4b\x38\x2a\x03\x8c\x17\x20\x24\x6f\x9a\xed\x0f\x49\xc2\x8c\x7a\x62\x49\x92\xd6\x35\x18\xd9\x34\xab\xd5\x49\x2b\xb3\x26\x80\x09\xa8\xb8\x62\x6c\x2b\xd5\x23\xcb\xb4\xf0\x7e\x17\x0f\x04\xaa\xb8\xe4\xa8\x2b\x25\x79\x8a\xe7\xa8\x51\xbc\x65\x4a\xee\xb8\xab\x4c\x50\x0f\xc0\xd3\x8f\xed\x82\xbd\x37\x47\xeb\x1e\x44\x50\xd6\x6c\x37\xc5\xdb\x4e\x3b\x88\x83\x86\xde\x62\x2b\xc4\xdf\x04\xad\x4b\x30\x1e\x64\x27\x1f\xac\x93\xe0\x5e\x44\x1f\x9c\x2a\x5f\xa4\xc2\x3e\x82\xeb\x02\x20\xa3\x07\x2b\x9f\x7b\x89\x64\x77\x12\x48\x2c\xd2\xfb\x92\x62\xda\x6e\x70\x39\x3a\x91\x88\xc0\x7a\x1f\x44\xa8\xfc\xfa\x52\xb9\x50\xac\xef\xef\xae\x10\xa2\x0d\x9e\x9c\xec\x6d\x4e\x06\x71\x7d\x72\x86\x02\x85\x93\xae\x46\x48\x1c\x2a\xa5\xa5\x3a\x65\xcf\xd3\x4b\xda\xf9\x1f\x01\xa9\x6b\x27\x4c\x0e\xec\xcd\x1f\xf0\xfc\x23\x7b\xf3\x28\x74\x05\xec\x62\xc7\xd6\x14\x52\xac\xf3\x39\xe0\x98\xcf\x6c\x09\x58\x5d\xfb\x27\x47\xa8\xc8\x40\x44\x67\x01\xc6\xd6\xec\x97\xb0\xa3\x40\x5a\xba\x7d\x35\x96\x08\xc2\x51\xe5\x95\xeb\x80\xbc\x1a\x8a\x03\x10\x4b\x07\x83\x42\xb6\x5a\x14\x09\xed\xaf\x26\x34\xd5\xe0\x89\xa4\xf8\x67\x66\xa0\x45\x29\x13\x5a\xb3\xde\x56\x54\x6c\x1a\x34\x7e\x7d\xf7\xe1\x66\x6f\x54\x59\x42\x60\xa5\x08\xc5\xad\xc3\x86\x79\x42\x2f\x07\xb7\xe9\xfb\x68\xc9\x63\x10\x2e\x87\x80\x3e\xef\xda\xc5\xc9\xeb\xbf\x54\xfd\x41\xbd\x7f\xb7\x07\xac\x77\x69\xad\xa6\x72\x8f\x12\x6b\xa3\xb9\xc5\x23\x3f\x60\x40\x2c\x3a\x8e\x89\x61\x79\x5b\x5e\x10\x19\x32\x54\x2e\x85\xd9\xf1\x9f\x78\x1f\x33\x7a\xf8\x44\x17\xc8\x3f\x72\x00\xc5\x8e\x1f\xe3\xc2\x2f\xb0\xab\x73\x96\xbe\x33\xb2\xb4\xca\x84\x29\xab\xfa\x73\x8a\x77\xd6\xb9\xfd\xe1\x8d\x38\x80\xf6\xe7\x4f\x7d\x60\xfb\xcc\x89\xf2\xac\x81\x77\xce\x59\x37\x3f\x9c\x46\x4f\x1a\x13\x58\xa6\x4d\x36\x80\x9d\x00\x1f\x81\x7a\x26\x79\x39\xdb\x12\xac\x40\x5a\xed\x38\xf2\xed\xfe\xe3\x0d\xfb\xcc\x72\x6d\x0f\x42\xe3\xba\x69\x08\x60\xda\x5d\xef\xb3\x02\x1e\xb0\xd3\x2e\x36\x9b\x6e\xe7\xda\xfa\x10\x49\x4a\xc2\x2d\x92\x93\x8a\x20\x52\xa4\xe6\xd4\xc3\x20\x4a\x4d\xd8\xf5\xe3\xc0\xc7\x79\x40\xd7\x7f\xad\xc0\x3d\xb3\x49\xf8\x93\xab\x6a\x38\x45\x3a\x03\x8b\x37\x30\x25\x62\x4c\xcf\x96\xe8\x92\xc5\xdf\xa4\x74\xea\x41\xb8\xe7\x48\x9b\xb8\xd3\x34\x94\x77\x3f\x46\xf8\x76\x43\x37\xe7\xf1\x4f\xa7\xc8\xdf\xed\x8f\xe7\xd1\x59\xe8\x27\x91\x0a\x0d\x2e\xb0\xf8\x9b\xd4\xf5\x4b\xd7\x5c\x83\xd0\xd8\x08\x9f\x59\x11\x17\x77\xf6\x8a\xd4\x11\x2d\xe6\x89\xa6\x9f\x94\x91\x2a\x13\xc1\x3a\x16\xe0\x29\x24\x15\x4e\x0b\x97\x09\x0f\x7c\x39\x8f\xb1\xd9\x85\x94\x96\x41\xf8\x67\x29\x65\x95\xf3\xd6\x25\xb1\xd9\xb0\x5d\x99\x14\x41\x24\xc1\xe6\xb9\xc6\x01\x1f\x90\xb2\x41\x95\x9c\x05\x15\x48\xee\x8e\xad\x53\xb9\x32\x42\x27\xdd\xf6\x25\xe0\x37\x0c\x98\x83\x58\x31\x65\xf2\x0b\xca\xe2\x03\x04\xd1\x76\x22\xb1\x74\x31\xd3\xb6\xc4\x91\x65\x71\x76\xb5\xea\x6c\xdd\xfd\xa5\x39\xc2\x19\x57\x06\x61\x34\x19\xf0\x2f\xd3\x6f\xc4\xdc\x48\x41\xdd\x79\xff\x0f\x29\xa8\x3d\x7c\xab\x3f\x7c\x62\x89\x4a\x07\x9e\x1a\x6b\xe0\xdb\xf9\xfd\x9d\x64\xa8\x6b\x75\x7c\x21\x32\x8d\xc6\x76\x32\xae\xdf\xfb\xdf\xc0\xe1\x33\xe0\x17\xc0\xaf\x48\x9f\x58\x5d\x7b\x85\x85\x58\xd0\x47\xae\x8b\xdc\x7e\x67\xaf\xcd\x62\x89\x73\x78\x29\xe7\x73\x4d\x29\x89\x0a\x6e\xda\x76\x7c\xf0\x0c\x18\x98\x3d\x87\xf5\xd7\x66\x31\xfd\x1c\xcc\xef\x8d\xde\x32\x73\x95\xe5\xd7\x0d\x06\xef\x42\x55\x1e\xb5\xc8\xf1\x7d\xb0\x6f\x25\xf6\x33\x89\xaf\xe5\x85\xd8\x61\x19\x63\x7a\x6d\x2f\x45\x5a\xe2\xff\x27\xe9\xaa\x57\xfe\x2b\x00\x00\xff\xff\xb2\x36\x91\x1f\xeb\x0c\x00\x00") +var _webUiTemplatesStatusHtml = []byte("\x1f\x8b\x08\x00\x00\x09\x6e\x88\x00\xff\xcc\x57\xcd\x6e\xdc\x36\x10\xbe\xef\x53\xb0\x44\x8e\xd5\x2e\x10\xa0\x17\x63\x57\x07\x1b\x29\x1c\xc0\x29\xdc\xac\x7d\xe9\x25\xe0\x8a\xb3\x12\x5b\x9a\x14\x48\xca\xf5\x42\xd1\xbb\x77\x86\x92\xa2\x9f\xd5\xa6\x49\x83\xb6\xb9\xac\x39\xe4\x70\x7e\xbe\xf9\x66\x44\xd7\xb5\x84\xa3\x32\xc0\x78\x01\x42\xf2\xa6\xd9\xfe\x90\x24\xcc\xa8\x17\x96\x24\x69\x5d\x83\x91\x4d\xb3\x5a\x0d\x5a\x99\x35\x01\x4c\x40\xc5\x15\x63\x5b\xa9\x9e\x59\xa6\x85\xf7\xbb\x78\x20\x50\xc5\x25\x47\x5d\x29\xc9\x53\x3c\x47\x8d\xe2\x35\x53\x72\xc7\x5d\x65\x82\x7a\x02\x9e\xbe\x6f\x17\xec\xad\x39\x5a\xf7\x24\x82\xb2\x66\xbb\x29\x5e\x77\xda\x41\x1c\x34\xf4\x16\x5b\x21\xfe\x26\x68\x5d\x82\xf1\x20\x3b\xf9\x60\x9d\x04\xf7\x49\xf4\xc1\xa9\xf2\x93\x54\xd8\x67\x70\x5d\x00\x64\xf4\x60\xe5\xa9\x97\x48\x76\x83\x40\x62\x91\x3e\x96\x14\xd3\x76\x83\xcb\xc9\x89\x44\x04\xd6\xfb\x20\x42\xe5\xd7\xd7\xca\x85\x62\xfd\xf8\x70\x83\x10\x6d\xf0\x64\xb0\xb7\x19\x0c\xe2\x7a\x70\x86\x02\x85\x93\xae\x26\x48\x1c\x2a\xa5\xa5\x1a\xb2\xe7\xe9\x35\xed\xfc\x8f\x80\xd4\xb5\x13\x26\x07\xf6\xea\x0f\x38\xfd\xc8\x5e\x3d\x0b\x5d\x01\xbb\xda\xb1\x35\x85\x14\xeb\x7c\x09\x38\xe6\x33\x5b\x02\x56\xd7\xfe\xc9\x11\x2a\x32\x10\xd1\x59\x80\xb1\x35\xfb\x39\xec\x28\x90\x96\x6e\x5f\x8c\x25\x82\x70\x54\x79\xe5\x3a\x20\x6f\xc6\xe2\x08\xc4\xd2\xc1\xa8\x90\xad\x16\x45\x42\xfb\xab\x19\x4d\x35\x78\x22\x29\xfe\x39\x33\xd0\xa2\x94\x09\xad\x59\x6f\x2b\x2a\x36\x0d\x1a\xbf\x7d\x78\x77\xb7\x37\xaa\x2c\x21\xb0\x52\x84\xe2\xde\x61\xc3\xbc\xa0\x97\x83\xdb\xf4\x7d\xb4\xe4\x31\x08\x97\x43\x40\x9f\x0f\xed\x62\xf0\xfa\x2f\x55\x7f\x54\xef\xdf\xed\x01\xeb\x5d\x5a\xab\xa9\xdc\x93\xc4\xda\x68\xee\xf1\xc8\x8f\x18\x10\x8b\x8e\x63\x62\x5c\xde\x96\x17\x44\x86\x0c\x95\x4b\x61\x76\xfc\x27\xde\xc7\x8c\x1e\x3e\xd0\x05\xf2\x8f\x1c\x40\xb1\xe3\xc7\xb4\xf0\x0b\xec\xea\x9c\xa5\x6f\x8c\x2c\xad\x32\x61\xce\xaa\xfe\x9c\xe2\x3d\xeb\xdc\xfe\xf0\x4e\x1c\x40\xfb\xcb\xa7\x3e\xb0\x7d\xe6\x44\x79\xd1\xc0\x1b\xe7\xac\x3b\x3f\x9c\x47\x4f\x1a\x33\x58\xe6\x4d\x36\x82\x9d\x00\x9f\x80\x7a\x21\x79\x79\xb6\x25\x58\x81\xb4\xda\x71\xe4\xdb\xe3\xfb\x3b\xf6\x91\xe5\xda\x1e\x84\xc6\x75\xd3\x10\xc0\xb4\xbb\xde\x67\x05\x3c\x61\xa7\x5d\x6d\x36\xdd\xce\xad\xf5\x21\x92\x94\x84\x7b\x24\x27\x15\x41\xa4\x48\xcd\xb9\x87\x51\x94\x9a\xb0\xeb\xc7\x81\x8f\xf3\x80\xae\xff\x5a\x81\x3b\xb1\x59\xf8\xb3\xab\x6a\x3c\x45\x3a\x03\x8b\x37\x30\x25\x62\x4c\xcf\x96\xe8\x92\xc5\xdf\xa4\x74\xea\x49\xb8\x53\xa4\x4d\xdc\x69\x1a\xca\xbb\x1f\x23\x7c\xbb\xa1\x9b\xe7\xf1\xcf\xa7\xc8\xdf\xed\x4f\xe7\xd1\x45\xe8\x67\x91\x0a\x0d\x2e\xb0\xf8\x9b\xd4\x35\x5b\xdf\x82\xd0\xd8\x01\x1f\x59\x11\x17\x0f\xf6\x86\xf4\x10\x26\xe6\x89\x9f\x1f\x94\x91\x2a\x13\xc1\x3a\x16\xe0\x25\x24\x15\x8e\x09\x97\x09\x0f\x7c\x39\x81\xce\xde\x42\x12\xcb\x69\xff\xb3\x24\xb2\xca\x79\xeb\x92\xd8\x5e\xd8\xa0\x4c\x8a\x20\x92\x60\xf3\x5c\xe3\x48\x0f\x48\xd2\xa0\x4a\xce\x82\x0a\x24\x77\xc7\xd6\xa9\x5c\x19\xa1\x93\x6e\xfb\x1a\xf0\xab\x05\xcc\x41\xac\x91\x32\xf9\x15\x85\xff\x0e\x82\x68\x7b\x8f\x78\xb9\x98\x62\x5b\xd4\xc8\xab\x38\xad\x5a\x75\xb6\xee\xfe\xd2\xe4\xe0\x8c\x2b\x83\xf8\x99\x0c\xf8\xe7\x09\x37\xe1\x6a\x24\x9d\xee\xbc\xff\x87\xa4\xd3\x1e\xbe\xd6\x1f\x3e\xaa\x44\xa5\x03\x4f\x8d\x35\xf0\xf5\x8c\xfe\x46\x32\xd4\xb5\x3a\x12\xe0\x3e\xb4\x43\x70\xfd\xd6\xff\x06\x0e\xbf\xf8\xbf\x00\x7e\x30\xfa\x8c\xea\xda\x2b\xac\xc0\x58\x11\x69\x2d\x72\xfb\x8d\xfd\x34\x78\x8f\x43\x76\x29\xbd\x4b\x1d\x27\xa9\xea\x6e\xde\x5a\x71\x00\x8e\xec\x5d\xc2\xf3\x4b\xe3\x9e\x0f\xf9\xf3\x7b\x93\x17\xca\xb9\xca\xf2\x9b\x05\xa3\x76\xa1\x2a\x8f\x5a\xe4\xf8\xd5\xdf\xb7\x12\xfb\x99\xc4\xef\xe5\xdd\xd7\xbd\x01\x62\x4c\xdf\xdb\xfb\x8f\x96\xf8\x5f\x47\xba\xea\x95\xff\x0a\x00\x00\xff\xff\xd0\x50\xb7\x0a\xc1\x0c\x00\x00") func webUiTemplatesStatusHtmlBytes() ([]byte, error) { return bindataRead( @@ -174,7 +174,7 @@ func webUiTemplatesStatusHtml() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "web/ui/templates/status.html", size: 3307, mode: os.FileMode(420), modTime: time.Unix(1455530985, 0)} + info := bindataFileInfo{name: "web/ui/templates/status.html", size: 3265, mode: os.FileMode(420), modTime: time.Unix(1456687049, 0)} a := &asset{bytes: bytes, info: info} return a, nil } diff --git a/web/ui/templates/status.html b/web/ui/templates/status.html index 085566e0c..f28069008 100644 --- a/web/ui/templates/status.html +++ b/web/ui/templates/status.html @@ -55,8 +55,8 @@ {{end}} - - {{.Status.Health}} + + {{.Health}} @@ -70,11 +70,11 @@ - {{if .Status.LastScrape.IsZero}}Never{{else}}{{since .Status.LastScrape}} ago{{end}} + {{if .LastScrape.IsZero}}Never{{else}}{{since .LastScrape}} ago{{end}} - {{if .Status.LastError}} - {{.Status.LastError}} + {{if .LastError}} + {{.LastError}} {{end}}