Merge pull request #970 from prometheus/fabxc/target-src

Improve unique target group sources.
This commit is contained in:
Fabian Reinartz 2015-08-10 11:31:33 +02:00
commit c672973788
10 changed files with 149 additions and 72 deletions

View file

@ -30,7 +30,6 @@ import (
)
const (
consulSourcePrefix = "consul"
consulWatchTimeout = 30 * time.Second
consulRetryInterval = 15 * time.Second
@ -127,7 +126,7 @@ func (cd *ConsulDiscovery) Sources() []string {
srcs := make([]string, 0, len(srvs))
for name := range srvs {
if _, ok := cd.scrapedServices[name]; ok {
srcs = append(srcs, consulSourcePrefix+":"+name)
srcs = append(srcs, name)
}
}
return srcs
@ -146,7 +145,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) {
return
case srv := <-update:
if srv.removed {
ch <- &config.TargetGroup{Source: consulSourcePrefix + ":" + srv.name}
ch <- &config.TargetGroup{Source: srv.name}
break
}
// Launch watcher for the service.
@ -221,7 +220,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
tgroup: &config.TargetGroup{},
done: make(chan struct{}, 1),
}
srv.tgroup.Source = consulSourcePrefix + ":" + name
srv.tgroup.Source = name
cd.services[name] = srv
}
srv.tgroup.Labels = clientmodel.LabelSet{

View file

@ -32,8 +32,7 @@ import (
const (
resolvConf = "/etc/resolv.conf"
dnsSourcePrefix = "dns"
DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_srv_name"
DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_srv_name"
// Constants for instrumentation.
namespace = "prometheus"
@ -123,7 +122,7 @@ func (dd *DNSDiscovery) Stop() {
func (dd *DNSDiscovery) Sources() []string {
var srcs []string
for _, name := range dd.names {
srcs = append(srcs, dnsSourcePrefix+":"+name)
srcs = append(srcs, name)
}
return srcs
}
@ -174,7 +173,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro
})
}
tg.Source = dnsSourcePrefix + ":" + name
tg.Source = name
ch <- tg
return nil

View file

@ -195,7 +195,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
// fileSource returns a source ID for the i-th target group in the file.
func fileSource(filename string, i int) string {
return fmt.Sprintf("file:%s:%d", filename, i)
return fmt.Sprintf("%s:%d", filename, i)
}
// Stop implements the TargetProvider interface.

View file

@ -63,7 +63,7 @@ func testFileSD(t *testing.T, ext string) {
if _, ok := tg.Labels["foo"]; !ok {
t.Fatalf("Label not parsed")
}
if tg.String() != fmt.Sprintf("file:fixtures/_test%s:0", ext) {
if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) {
t.Fatalf("Unexpected target group", tg)
}
}
@ -71,7 +71,7 @@ func testFileSD(t *testing.T, ext string) {
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("file:fixtures/_test%s:1", ext) {
if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) {
t.Fatalf("Unexpected target group %s", tg)
}
}

View file

@ -67,7 +67,7 @@ func isValidApp(app *App) bool {
}
func targetGroupName(app *App) string {
return fmt.Sprintf("marathon:%s", sanitizeName(app.ID))
return sanitizeName(app.ID)
}
func sanitizeName(id string) string {

View file

@ -7,9 +7,10 @@ import (
const appListPath string = "/v2/apps/?embed=apps.tasks"
// RandomAppsURL randomly selects a server from an array and creates an URL pointing to the app list.
// RandomAppsURL randomly selects a server from an array and creates
// an URL pointing to the app list.
func RandomAppsURL(servers []string) string {
// TODO If possible update server list from Marathon at some point
// TODO: If possible update server list from Marathon at some point.
server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath)
}

View file

@ -84,7 +84,7 @@ func TestMarathonSDSendGroup(t *testing.T) {
go func() {
select {
case tg := <-ch:
if tg.Source != "marathon:test-service" {
if tg.Source != "test-service" {
t.Fatalf("Wrong target group name: %s", tg.Source)
}
if len(tg.Targets) != 1 {

View file

@ -31,8 +31,6 @@ import (
)
const (
serversetSourcePrefix = "serverset"
serversetNodePrefix = "member_"
serversetLabelPrefix = clientmodel.MetaLabelPrefix + "serverset_"
@ -112,7 +110,7 @@ func (sd *ServersetDiscovery) processUpdates() {
defer sd.conn.Close()
for event := range sd.updates {
tg := &config.TargetGroup{
Source: serversetSourcePrefix + event.Path,
Source: event.Path,
}
sd.mu.Lock()
if event.Data != nil {

View file

@ -82,7 +82,6 @@ func (tm *TargetManager) Run() {
go tm.handleTargetUpdates(scfg, ch)
for _, src := range prov.Sources() {
src = fullSource(scfg, src)
sources[src] = struct{}{}
}
@ -118,14 +117,6 @@ func (tm *TargetManager) handleTargetUpdates(cfg *config.ScrapeConfig, ch <-chan
}
}
// fullSource prepends the unique job name to the source.
//
// Thus, oscilliating label sets for targets with the same source,
// but providers from different configs, are prevented.
func fullSource(cfg *config.ScrapeConfig, src string) string {
return cfg.JobName + ":" + src
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
tm.m.RLock()
@ -199,7 +190,6 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
if err != nil {
return err
}
src := fullSource(cfg, tgroup.Source)
tm.m.Lock()
defer tm.m.Unlock()
@ -208,7 +198,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
return nil
}
oldTargets, ok := tm.targets[src]
oldTargets, ok := tm.targets[tgroup.Source]
if ok {
var wg sync.WaitGroup
// Replace the old targets with the new ones while keeping the state
@ -259,9 +249,9 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
}
if len(newTargets) > 0 {
tm.targets[src] = newTargets
tm.targets[tgroup.Source] = newTargets
} else {
delete(tm.targets, src)
delete(tm.targets, tgroup.Source)
}
return nil
}
@ -298,7 +288,7 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
providers := map[*config.ScrapeConfig][]TargetProvider{}
for _, scfg := range cfg.ScrapeConfigs {
providers[scfg] = ProvidersFromConfig(scfg)
providers[scfg] = providersFromConfig(scfg)
}
tm.m.Lock()
@ -309,6 +299,76 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
return true
}
// prefixedTargetProvider wraps TargetProvider and prefixes source strings
// to make the sources unique across a configuration.
type prefixedTargetProvider struct {
TargetProvider
job string
mechanism string
idx int
}
func (tp *prefixedTargetProvider) prefix(src string) string {
return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src)
}
func (tp *prefixedTargetProvider) Sources() []string {
srcs := tp.TargetProvider.Sources()
for i, src := range srcs {
srcs[i] = tp.prefix(src)
}
return srcs
}
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup) {
defer close(ch)
ch2 := make(chan *config.TargetGroup)
go tp.TargetProvider.Run(ch2)
for tg := range ch2 {
tg.Source = tp.prefix(tg.Source)
ch <- tg
}
}
// providersFromConfig returns all TargetProviders configured in cfg.
func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
var providers []TargetProvider
app := func(mech string, i int, tp TargetProvider) {
providers = append(providers, &prefixedTargetProvider{
job: cfg.JobName,
mechanism: mech,
idx: i,
TargetProvider: tp,
})
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, discovery.NewDNSDiscovery(c))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, discovery.NewFileDiscovery(c))
}
for i, c := range cfg.ConsulSDConfigs {
app("consul", i, discovery.NewConsulDiscovery(c))
}
for i, c := range cfg.MarathonSDConfigs {
app("marathon", i, discovery.NewMarathonDiscovery(c))
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, discovery.NewServersetDiscovery(c))
}
if len(cfg.TargetGroups) > 0 {
app("static", 0, NewStaticProvider(cfg.TargetGroups))
}
return providers
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
tm.m.RLock()
@ -382,31 +442,6 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc
return targets, nil
}
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
var providers []TargetProvider
for _, c := range cfg.DNSSDConfigs {
providers = append(providers, discovery.NewDNSDiscovery(c))
}
for _, c := range cfg.FileSDConfigs {
providers = append(providers, discovery.NewFileDiscovery(c))
}
for _, c := range cfg.ConsulSDConfigs {
providers = append(providers, discovery.NewConsulDiscovery(c))
}
for _, c := range cfg.ServersetSDConfigs {
providers = append(providers, discovery.NewServersetDiscovery(c))
}
for _, c := range cfg.MarathonSDConfigs {
providers = append(providers, discovery.NewMarathonDiscovery(c))
}
if len(cfg.TargetGroups) > 0 {
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
}
return providers
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
@ -416,7 +451,7 @@ type StaticProvider struct {
// target groups.
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups {
tg.Source = fmt.Sprintf("static:%d", i)
tg.Source = fmt.Sprintf("%d", i)
}
return &StaticProvider{
TargetGroups: groups,

View file

@ -25,6 +25,51 @@ import (
"github.com/prometheus/prometheus/config"
)
func TestPrefixedTargetProvider(t *testing.T) {
targetGroups := []*config.TargetGroup{
{
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1234"},
},
}, {
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1235"},
},
},
}
tp := &prefixedTargetProvider{
job: "job-x",
mechanism: "static",
idx: 123,
TargetProvider: NewStaticProvider(targetGroups),
}
expSources := []string{
"job-x:static:123:0",
"job-x:static:123:1",
}
if !reflect.DeepEqual(tp.Sources(), expSources) {
t.Fatalf("expected sources %v, got %v", expSources, tp.Sources())
}
ch := make(chan *config.TargetGroup)
go tp.Run(ch)
expGroup1 := *targetGroups[0]
expGroup2 := *targetGroups[1]
expGroup1.Source = "job-x:static:123:0"
expGroup2.Source = "job-x:static:123:1"
// The static target provider sends on the channel once per target group.
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) {
t.Fatalf("expected target group %v, got %v", expGroup1, tg)
}
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) {
t.Fatalf("expected target group %v, got %v", expGroup2, tg)
}
}
func TestTargetManagerChan(t *testing.T) {
testJob1 := &config.ScrapeConfig{
JobName: "test_job1",
@ -65,7 +110,7 @@ func TestTargetManagerChan(t *testing.T) {
},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
@ -82,12 +127,12 @@ func TestTargetManagerChan(t *testing.T) {
Labels: clientmodel.LabelSet{"group": "label"},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
},
"test_job1:src2": {
"src2": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1235", "group": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1235", "group": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1235", "group": "label"},
@ -99,7 +144,7 @@ func TestTargetManagerChan(t *testing.T) {
Targets: []clientmodel.LabelSet{},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
@ -115,7 +160,7 @@ func TestTargetManagerChan(t *testing.T) {
},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234", "added": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-4:1234", "fancy": "label"},
@ -239,7 +284,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
{
scrapeConfigs: []*config.ScrapeConfig{testJob1},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
"test_job1:static:0:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"},
},
@ -247,7 +292,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob1},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
"test_job1:static:0:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"},
},
@ -255,18 +300,18 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob1, testJob2},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
"test_job1:static:0:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"},
},
"test_job2:static:0": {
"test_job2:static:0:0": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080", "foo": "bar", "new": "ox-ba"},
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081", "foo": "bar", "new": "ox-ba"},
},
"test_job2:static:1": {
"test_job2:static:0:1": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "foo.com:1234"},
},
"test_job2:static:2": {
"test_job2:static:0:2": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "fixed"},
},
},
@ -276,14 +321,14 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob2},
expected: map[string][]clientmodel.LabelSet{
"test_job2:static:0": {
"test_job2:static:0:0": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080", "foo": "bar", "new": "ox-ba"},
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081", "foo": "bar", "new": "ox-ba"},
},
"test_job2:static:1": {
"test_job2:static:0:1": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "foo.com:1234"},
},
"test_job2:static:2": {
"test_job2:static:0:2": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "fixed"},
},
},