From 0138d37458567f63b20436d6bf3ac13e9ce4f8b7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 7 Aug 2015 13:18:19 +0200 Subject: [PATCH] Improve unique target group sources. Include position of same SD mechanisms within the same scrape configuration. Move unique prefixing out of SD implementations and target manager into its own interface. --- retrieval/discovery/consul.go | 7 +- retrieval/discovery/dns.go | 7 +- retrieval/discovery/file.go | 2 +- retrieval/discovery/file_test.go | 4 +- retrieval/discovery/marathon/conversion.go | 2 +- retrieval/discovery/marathon/url.go | 5 +- retrieval/discovery/marathon_test.go | 2 +- retrieval/discovery/serverset.go | 4 +- retrieval/targetmanager.go | 115 ++++++++++++++------- retrieval/targetmanager_test.go | 73 ++++++++++--- 10 files changed, 149 insertions(+), 72 deletions(-) diff --git a/retrieval/discovery/consul.go b/retrieval/discovery/consul.go index 77284394c..fc2645a84 100644 --- a/retrieval/discovery/consul.go +++ b/retrieval/discovery/consul.go @@ -30,7 +30,6 @@ import ( ) const ( - consulSourcePrefix = "consul" consulWatchTimeout = 30 * time.Second consulRetryInterval = 15 * time.Second @@ -127,7 +126,7 @@ func (cd *ConsulDiscovery) Sources() []string { srcs := make([]string, 0, len(srvs)) for name := range srvs { if _, ok := cd.scrapedServices[name]; ok { - srcs = append(srcs, consulSourcePrefix+":"+name) + srcs = append(srcs, name) } } return srcs @@ -146,7 +145,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) { return case srv := <-update: if srv.removed { - ch <- &config.TargetGroup{Source: consulSourcePrefix + ":" + srv.name} + ch <- &config.TargetGroup{Source: srv.name} break } // Launch watcher for the service. @@ -221,7 +220,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { tgroup: &config.TargetGroup{}, done: make(chan struct{}, 1), } - srv.tgroup.Source = consulSourcePrefix + ":" + name + srv.tgroup.Source = name cd.services[name] = srv } srv.tgroup.Labels = clientmodel.LabelSet{ diff --git a/retrieval/discovery/dns.go b/retrieval/discovery/dns.go index 70aa2a7c2..d9fbffbc1 100644 --- a/retrieval/discovery/dns.go +++ b/retrieval/discovery/dns.go @@ -32,8 +32,7 @@ import ( const ( resolvConf = "/etc/resolv.conf" - dnsSourcePrefix = "dns" - DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_srv_name" + DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_srv_name" // Constants for instrumentation. namespace = "prometheus" @@ -123,7 +122,7 @@ func (dd *DNSDiscovery) Stop() { func (dd *DNSDiscovery) Sources() []string { var srcs []string for _, name := range dd.names { - srcs = append(srcs, dnsSourcePrefix+":"+name) + srcs = append(srcs, name) } return srcs } @@ -174,7 +173,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro }) } - tg.Source = dnsSourcePrefix + ":" + name + tg.Source = name ch <- tg return nil diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index 25201dd77..4c0655f03 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -195,7 +195,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { // fileSource returns a source ID for the i-th target group in the file. func fileSource(filename string, i int) string { - return fmt.Sprintf("file:%s:%d", filename, i) + return fmt.Sprintf("%s:%d", filename, i) } // Stop implements the TargetProvider interface. diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 89de0cb0a..6d1f00c44 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -63,7 +63,7 @@ func testFileSD(t *testing.T, ext string) { if _, ok := tg.Labels["foo"]; !ok { t.Fatalf("Label not parsed") } - if tg.String() != fmt.Sprintf("file:fixtures/_test%s:0", ext) { + if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) { t.Fatalf("Unexpected target group", tg) } } @@ -71,7 +71,7 @@ func testFileSD(t *testing.T, ext string) { case <-time.After(15 * time.Second): t.Fatalf("Expected new target group but got none") case tg := <-ch: - if tg.String() != fmt.Sprintf("file:fixtures/_test%s:1", ext) { + if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) { t.Fatalf("Unexpected target group %s", tg) } } diff --git a/retrieval/discovery/marathon/conversion.go b/retrieval/discovery/marathon/conversion.go index eace6cb2d..c6405e2f7 100644 --- a/retrieval/discovery/marathon/conversion.go +++ b/retrieval/discovery/marathon/conversion.go @@ -67,7 +67,7 @@ func isValidApp(app *App) bool { } func targetGroupName(app *App) string { - return fmt.Sprintf("marathon:%s", sanitizeName(app.ID)) + return sanitizeName(app.ID) } func sanitizeName(id string) string { diff --git a/retrieval/discovery/marathon/url.go b/retrieval/discovery/marathon/url.go index 61f5041f9..6fa49ae4d 100644 --- a/retrieval/discovery/marathon/url.go +++ b/retrieval/discovery/marathon/url.go @@ -7,9 +7,10 @@ import ( const appListPath string = "/v2/apps/?embed=apps.tasks" -// RandomAppsURL randomly selects a server from an array and creates an URL pointing to the app list. +// RandomAppsURL randomly selects a server from an array and creates +// an URL pointing to the app list. func RandomAppsURL(servers []string) string { - // TODO If possible update server list from Marathon at some point + // TODO: If possible update server list from Marathon at some point. server := servers[rand.Intn(len(servers))] return fmt.Sprintf("%s%s", server, appListPath) } diff --git a/retrieval/discovery/marathon_test.go b/retrieval/discovery/marathon_test.go index 7cf6b74b6..fcf11562c 100644 --- a/retrieval/discovery/marathon_test.go +++ b/retrieval/discovery/marathon_test.go @@ -84,7 +84,7 @@ func TestMarathonSDSendGroup(t *testing.T) { go func() { select { case tg := <-ch: - if tg.Source != "marathon:test-service" { + if tg.Source != "test-service" { t.Fatalf("Wrong target group name: %s", tg.Source) } if len(tg.Targets) != 1 { diff --git a/retrieval/discovery/serverset.go b/retrieval/discovery/serverset.go index 874855f2d..7cc38df6a 100644 --- a/retrieval/discovery/serverset.go +++ b/retrieval/discovery/serverset.go @@ -31,8 +31,6 @@ import ( ) const ( - serversetSourcePrefix = "serverset" - serversetNodePrefix = "member_" serversetLabelPrefix = clientmodel.MetaLabelPrefix + "serverset_" @@ -112,7 +110,7 @@ func (sd *ServersetDiscovery) processUpdates() { defer sd.conn.Close() for event := range sd.updates { tg := &config.TargetGroup{ - Source: serversetSourcePrefix + event.Path, + Source: event.Path, } sd.mu.Lock() if event.Data != nil { diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 1474169a0..1643af2b7 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -82,7 +82,6 @@ func (tm *TargetManager) Run() { go tm.handleTargetUpdates(scfg, ch) for _, src := range prov.Sources() { - src = fullSource(scfg, src) sources[src] = struct{}{} } @@ -118,14 +117,6 @@ func (tm *TargetManager) handleTargetUpdates(cfg *config.ScrapeConfig, ch <-chan } } -// fullSource prepends the unique job name to the source. -// -// Thus, oscilliating label sets for targets with the same source, -// but providers from different configs, are prevented. -func fullSource(cfg *config.ScrapeConfig, src string) string { - return cfg.JobName + ":" + src -} - // Stop all background processing. func (tm *TargetManager) Stop() { tm.m.RLock() @@ -199,7 +190,6 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf if err != nil { return err } - src := fullSource(cfg, tgroup.Source) tm.m.Lock() defer tm.m.Unlock() @@ -208,7 +198,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf return nil } - oldTargets, ok := tm.targets[src] + oldTargets, ok := tm.targets[tgroup.Source] if ok { var wg sync.WaitGroup // Replace the old targets with the new ones while keeping the state @@ -259,9 +249,9 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf } if len(newTargets) > 0 { - tm.targets[src] = newTargets + tm.targets[tgroup.Source] = newTargets } else { - delete(tm.targets, src) + delete(tm.targets, tgroup.Source) } return nil } @@ -298,7 +288,7 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { providers := map[*config.ScrapeConfig][]TargetProvider{} for _, scfg := range cfg.ScrapeConfigs { - providers[scfg] = ProvidersFromConfig(scfg) + providers[scfg] = providersFromConfig(scfg) } tm.m.Lock() @@ -309,6 +299,76 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool { return true } +// prefixedTargetProvider wraps TargetProvider and prefixes source strings +// to make the sources unique across a configuration. +type prefixedTargetProvider struct { + TargetProvider + + job string + mechanism string + idx int +} + +func (tp *prefixedTargetProvider) prefix(src string) string { + return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src) +} + +func (tp *prefixedTargetProvider) Sources() []string { + srcs := tp.TargetProvider.Sources() + for i, src := range srcs { + srcs[i] = tp.prefix(src) + } + + return srcs +} + +func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup) { + defer close(ch) + + ch2 := make(chan *config.TargetGroup) + go tp.TargetProvider.Run(ch2) + + for tg := range ch2 { + tg.Source = tp.prefix(tg.Source) + ch <- tg + } +} + +// providersFromConfig returns all TargetProviders configured in cfg. +func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { + var providers []TargetProvider + + app := func(mech string, i int, tp TargetProvider) { + providers = append(providers, &prefixedTargetProvider{ + job: cfg.JobName, + mechanism: mech, + idx: i, + TargetProvider: tp, + }) + } + + for i, c := range cfg.DNSSDConfigs { + app("dns", i, discovery.NewDNSDiscovery(c)) + } + for i, c := range cfg.FileSDConfigs { + app("file", i, discovery.NewFileDiscovery(c)) + } + for i, c := range cfg.ConsulSDConfigs { + app("consul", i, discovery.NewConsulDiscovery(c)) + } + for i, c := range cfg.MarathonSDConfigs { + app("marathon", i, discovery.NewMarathonDiscovery(c)) + } + for i, c := range cfg.ServersetSDConfigs { + app("serverset", i, discovery.NewServersetDiscovery(c)) + } + if len(cfg.TargetGroups) > 0 { + app("static", 0, NewStaticProvider(cfg.TargetGroups)) + } + + return providers +} + // targetsFromGroup builds targets based on the given TargetGroup and config. func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { tm.m.RLock() @@ -382,31 +442,6 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc return targets, nil } -// ProvidersFromConfig returns all TargetProviders configured in cfg. -func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { - var providers []TargetProvider - - for _, c := range cfg.DNSSDConfigs { - providers = append(providers, discovery.NewDNSDiscovery(c)) - } - for _, c := range cfg.FileSDConfigs { - providers = append(providers, discovery.NewFileDiscovery(c)) - } - for _, c := range cfg.ConsulSDConfigs { - providers = append(providers, discovery.NewConsulDiscovery(c)) - } - for _, c := range cfg.ServersetSDConfigs { - providers = append(providers, discovery.NewServersetDiscovery(c)) - } - for _, c := range cfg.MarathonSDConfigs { - providers = append(providers, discovery.NewMarathonDiscovery(c)) - } - if len(cfg.TargetGroups) > 0 { - providers = append(providers, NewStaticProvider(cfg.TargetGroups)) - } - return providers -} - // StaticProvider holds a list of target groups that never change. type StaticProvider struct { TargetGroups []*config.TargetGroup @@ -416,7 +451,7 @@ type StaticProvider struct { // target groups. func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { for i, tg := range groups { - tg.Source = fmt.Sprintf("static:%d", i) + tg.Source = fmt.Sprintf("%d", i) } return &StaticProvider{ TargetGroups: groups, diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 07c415745..a911ce8df 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -25,6 +25,51 @@ import ( "github.com/prometheus/prometheus/config" ) +func TestPrefixedTargetProvider(t *testing.T) { + targetGroups := []*config.TargetGroup{ + { + Targets: []clientmodel.LabelSet{ + {clientmodel.AddressLabel: "test-1:1234"}, + }, + }, { + Targets: []clientmodel.LabelSet{ + {clientmodel.AddressLabel: "test-1:1235"}, + }, + }, + } + + tp := &prefixedTargetProvider{ + job: "job-x", + mechanism: "static", + idx: 123, + TargetProvider: NewStaticProvider(targetGroups), + } + + expSources := []string{ + "job-x:static:123:0", + "job-x:static:123:1", + } + if !reflect.DeepEqual(tp.Sources(), expSources) { + t.Fatalf("expected sources %v, got %v", expSources, tp.Sources()) + } + + ch := make(chan *config.TargetGroup) + go tp.Run(ch) + + expGroup1 := *targetGroups[0] + expGroup2 := *targetGroups[1] + expGroup1.Source = "job-x:static:123:0" + expGroup2.Source = "job-x:static:123:1" + + // The static target provider sends on the channel once per target group. + if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) { + t.Fatalf("expected target group %v, got %v", expGroup1, tg) + } + if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) { + t.Fatalf("expected target group %v, got %v", expGroup2, tg) + } +} + func TestTargetManagerChan(t *testing.T) { testJob1 := &config.ScrapeConfig{ JobName: "test_job1", @@ -65,7 +110,7 @@ func TestTargetManagerChan(t *testing.T) { }, }, expected: map[string][]clientmodel.LabelSet{ - "test_job1:src1": { + "src1": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, @@ -82,12 +127,12 @@ func TestTargetManagerChan(t *testing.T) { Labels: clientmodel.LabelSet{"group": "label"}, }, expected: map[string][]clientmodel.LabelSet{ - "test_job1:src1": { + "src1": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, }, - "test_job1:src2": { + "src2": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1235", "group": "label"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1235", "group": "label"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1235", "group": "label"}, @@ -99,7 +144,7 @@ func TestTargetManagerChan(t *testing.T) { Targets: []clientmodel.LabelSet{}, }, expected: map[string][]clientmodel.LabelSet{ - "test_job1:src1": { + "src1": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, @@ -115,7 +160,7 @@ func TestTargetManagerChan(t *testing.T) { }, }, expected: map[string][]clientmodel.LabelSet{ - "test_job1:src1": { + "src1": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234", "added": "label"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-4:1234", "fancy": "label"}, @@ -239,7 +284,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) { { scrapeConfigs: []*config.ScrapeConfig{testJob1}, expected: map[string][]clientmodel.LabelSet{ - "test_job1:static:0": { + "test_job1:static:0:0": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"}, }, @@ -247,7 +292,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) { }, { scrapeConfigs: []*config.ScrapeConfig{testJob1}, expected: map[string][]clientmodel.LabelSet{ - "test_job1:static:0": { + "test_job1:static:0:0": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"}, }, @@ -255,18 +300,18 @@ func TestTargetManagerConfigUpdate(t *testing.T) { }, { scrapeConfigs: []*config.ScrapeConfig{testJob1, testJob2}, expected: map[string][]clientmodel.LabelSet{ - "test_job1:static:0": { + "test_job1:static:0:0": { {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"}, {clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"}, }, - "test_job2:static:0": { + "test_job2:static:0:0": { {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080", "foo": "bar", "new": "ox-ba"}, {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081", "foo": "bar", "new": "ox-ba"}, }, - "test_job2:static:1": { + "test_job2:static:0:1": { {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "foo.com:1234"}, }, - "test_job2:static:2": { + "test_job2:static:0:2": { {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "fixed"}, }, }, @@ -276,14 +321,14 @@ func TestTargetManagerConfigUpdate(t *testing.T) { }, { scrapeConfigs: []*config.ScrapeConfig{testJob2}, expected: map[string][]clientmodel.LabelSet{ - "test_job2:static:0": { + "test_job2:static:0:0": { {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080", "foo": "bar", "new": "ox-ba"}, {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081", "foo": "bar", "new": "ox-ba"}, }, - "test_job2:static:1": { + "test_job2:static:0:1": { {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "foo.com:1234"}, }, - "test_job2:static:2": { + "test_job2:static:0:2": { {clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "fixed"}, }, },