Improve unique target group sources.

Include position of same SD mechanisms within the same scrape configuration.
Move unique prefixing out of SD implementations and target manager into
its own interface.
This commit is contained in:
Fabian Reinartz 2015-08-07 13:18:19 +02:00
parent 54202bc5a8
commit 0138d37458
10 changed files with 149 additions and 72 deletions

View file

@ -30,7 +30,6 @@ import (
)
const (
consulSourcePrefix = "consul"
consulWatchTimeout = 30 * time.Second
consulRetryInterval = 15 * time.Second
@ -127,7 +126,7 @@ func (cd *ConsulDiscovery) Sources() []string {
srcs := make([]string, 0, len(srvs))
for name := range srvs {
if _, ok := cd.scrapedServices[name]; ok {
srcs = append(srcs, consulSourcePrefix+":"+name)
srcs = append(srcs, name)
}
}
return srcs
@ -146,7 +145,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) {
return
case srv := <-update:
if srv.removed {
ch <- &config.TargetGroup{Source: consulSourcePrefix + ":" + srv.name}
ch <- &config.TargetGroup{Source: srv.name}
break
}
// Launch watcher for the service.
@ -221,7 +220,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) {
tgroup: &config.TargetGroup{},
done: make(chan struct{}, 1),
}
srv.tgroup.Source = consulSourcePrefix + ":" + name
srv.tgroup.Source = name
cd.services[name] = srv
}
srv.tgroup.Labels = clientmodel.LabelSet{

View file

@ -32,8 +32,7 @@ import (
const (
resolvConf = "/etc/resolv.conf"
dnsSourcePrefix = "dns"
DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_srv_name"
DNSNameLabel = clientmodel.MetaLabelPrefix + "dns_srv_name"
// Constants for instrumentation.
namespace = "prometheus"
@ -123,7 +122,7 @@ func (dd *DNSDiscovery) Stop() {
func (dd *DNSDiscovery) Sources() []string {
var srcs []string
for _, name := range dd.names {
srcs = append(srcs, dnsSourcePrefix+":"+name)
srcs = append(srcs, name)
}
return srcs
}
@ -174,7 +173,7 @@ func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) erro
})
}
tg.Source = dnsSourcePrefix + ":" + name
tg.Source = name
ch <- tg
return nil

View file

@ -195,7 +195,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
// fileSource returns a source ID for the i-th target group in the file.
func fileSource(filename string, i int) string {
return fmt.Sprintf("file:%s:%d", filename, i)
return fmt.Sprintf("%s:%d", filename, i)
}
// Stop implements the TargetProvider interface.

View file

@ -63,7 +63,7 @@ func testFileSD(t *testing.T, ext string) {
if _, ok := tg.Labels["foo"]; !ok {
t.Fatalf("Label not parsed")
}
if tg.String() != fmt.Sprintf("file:fixtures/_test%s:0", ext) {
if tg.String() != fmt.Sprintf("fixtures/_test%s:0", ext) {
t.Fatalf("Unexpected target group", tg)
}
}
@ -71,7 +71,7 @@ func testFileSD(t *testing.T, ext string) {
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("file:fixtures/_test%s:1", ext) {
if tg.String() != fmt.Sprintf("fixtures/_test%s:1", ext) {
t.Fatalf("Unexpected target group %s", tg)
}
}

View file

@ -67,7 +67,7 @@ func isValidApp(app *App) bool {
}
func targetGroupName(app *App) string {
return fmt.Sprintf("marathon:%s", sanitizeName(app.ID))
return sanitizeName(app.ID)
}
func sanitizeName(id string) string {

View file

@ -7,9 +7,10 @@ import (
const appListPath string = "/v2/apps/?embed=apps.tasks"
// RandomAppsURL randomly selects a server from an array and creates an URL pointing to the app list.
// RandomAppsURL randomly selects a server from an array and creates
// an URL pointing to the app list.
func RandomAppsURL(servers []string) string {
// TODO If possible update server list from Marathon at some point
// TODO: If possible update server list from Marathon at some point.
server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath)
}

View file

@ -84,7 +84,7 @@ func TestMarathonSDSendGroup(t *testing.T) {
go func() {
select {
case tg := <-ch:
if tg.Source != "marathon:test-service" {
if tg.Source != "test-service" {
t.Fatalf("Wrong target group name: %s", tg.Source)
}
if len(tg.Targets) != 1 {

View file

@ -31,8 +31,6 @@ import (
)
const (
serversetSourcePrefix = "serverset"
serversetNodePrefix = "member_"
serversetLabelPrefix = clientmodel.MetaLabelPrefix + "serverset_"
@ -112,7 +110,7 @@ func (sd *ServersetDiscovery) processUpdates() {
defer sd.conn.Close()
for event := range sd.updates {
tg := &config.TargetGroup{
Source: serversetSourcePrefix + event.Path,
Source: event.Path,
}
sd.mu.Lock()
if event.Data != nil {

View file

@ -82,7 +82,6 @@ func (tm *TargetManager) Run() {
go tm.handleTargetUpdates(scfg, ch)
for _, src := range prov.Sources() {
src = fullSource(scfg, src)
sources[src] = struct{}{}
}
@ -118,14 +117,6 @@ func (tm *TargetManager) handleTargetUpdates(cfg *config.ScrapeConfig, ch <-chan
}
}
// fullSource prepends the unique job name to the source.
//
// Thus, oscilliating label sets for targets with the same source,
// but providers from different configs, are prevented.
func fullSource(cfg *config.ScrapeConfig, src string) string {
return cfg.JobName + ":" + src
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
tm.m.RLock()
@ -199,7 +190,6 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
if err != nil {
return err
}
src := fullSource(cfg, tgroup.Source)
tm.m.Lock()
defer tm.m.Unlock()
@ -208,7 +198,7 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
return nil
}
oldTargets, ok := tm.targets[src]
oldTargets, ok := tm.targets[tgroup.Source]
if ok {
var wg sync.WaitGroup
// Replace the old targets with the new ones while keeping the state
@ -259,9 +249,9 @@ func (tm *TargetManager) updateTargetGroup(tgroup *config.TargetGroup, cfg *conf
}
if len(newTargets) > 0 {
tm.targets[src] = newTargets
tm.targets[tgroup.Source] = newTargets
} else {
delete(tm.targets, src)
delete(tm.targets, tgroup.Source)
}
return nil
}
@ -298,7 +288,7 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
providers := map[*config.ScrapeConfig][]TargetProvider{}
for _, scfg := range cfg.ScrapeConfigs {
providers[scfg] = ProvidersFromConfig(scfg)
providers[scfg] = providersFromConfig(scfg)
}
tm.m.Lock()
@ -309,6 +299,76 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) bool {
return true
}
// prefixedTargetProvider wraps TargetProvider and prefixes source strings
// to make the sources unique across a configuration.
type prefixedTargetProvider struct {
TargetProvider
job string
mechanism string
idx int
}
func (tp *prefixedTargetProvider) prefix(src string) string {
return fmt.Sprintf("%s:%s:%d:%s", tp.job, tp.mechanism, tp.idx, src)
}
func (tp *prefixedTargetProvider) Sources() []string {
srcs := tp.TargetProvider.Sources()
for i, src := range srcs {
srcs[i] = tp.prefix(src)
}
return srcs
}
func (tp *prefixedTargetProvider) Run(ch chan<- *config.TargetGroup) {
defer close(ch)
ch2 := make(chan *config.TargetGroup)
go tp.TargetProvider.Run(ch2)
for tg := range ch2 {
tg.Source = tp.prefix(tg.Source)
ch <- tg
}
}
// providersFromConfig returns all TargetProviders configured in cfg.
func providersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
var providers []TargetProvider
app := func(mech string, i int, tp TargetProvider) {
providers = append(providers, &prefixedTargetProvider{
job: cfg.JobName,
mechanism: mech,
idx: i,
TargetProvider: tp,
})
}
for i, c := range cfg.DNSSDConfigs {
app("dns", i, discovery.NewDNSDiscovery(c))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, discovery.NewFileDiscovery(c))
}
for i, c := range cfg.ConsulSDConfigs {
app("consul", i, discovery.NewConsulDiscovery(c))
}
for i, c := range cfg.MarathonSDConfigs {
app("marathon", i, discovery.NewMarathonDiscovery(c))
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, discovery.NewServersetDiscovery(c))
}
if len(cfg.TargetGroups) > 0 {
app("static", 0, NewStaticProvider(cfg.TargetGroups))
}
return providers
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
tm.m.RLock()
@ -382,31 +442,6 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc
return targets, nil
}
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
var providers []TargetProvider
for _, c := range cfg.DNSSDConfigs {
providers = append(providers, discovery.NewDNSDiscovery(c))
}
for _, c := range cfg.FileSDConfigs {
providers = append(providers, discovery.NewFileDiscovery(c))
}
for _, c := range cfg.ConsulSDConfigs {
providers = append(providers, discovery.NewConsulDiscovery(c))
}
for _, c := range cfg.ServersetSDConfigs {
providers = append(providers, discovery.NewServersetDiscovery(c))
}
for _, c := range cfg.MarathonSDConfigs {
providers = append(providers, discovery.NewMarathonDiscovery(c))
}
if len(cfg.TargetGroups) > 0 {
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
}
return providers
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
@ -416,7 +451,7 @@ type StaticProvider struct {
// target groups.
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups {
tg.Source = fmt.Sprintf("static:%d", i)
tg.Source = fmt.Sprintf("%d", i)
}
return &StaticProvider{
TargetGroups: groups,

View file

@ -25,6 +25,51 @@ import (
"github.com/prometheus/prometheus/config"
)
func TestPrefixedTargetProvider(t *testing.T) {
targetGroups := []*config.TargetGroup{
{
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1234"},
},
}, {
Targets: []clientmodel.LabelSet{
{clientmodel.AddressLabel: "test-1:1235"},
},
},
}
tp := &prefixedTargetProvider{
job: "job-x",
mechanism: "static",
idx: 123,
TargetProvider: NewStaticProvider(targetGroups),
}
expSources := []string{
"job-x:static:123:0",
"job-x:static:123:1",
}
if !reflect.DeepEqual(tp.Sources(), expSources) {
t.Fatalf("expected sources %v, got %v", expSources, tp.Sources())
}
ch := make(chan *config.TargetGroup)
go tp.Run(ch)
expGroup1 := *targetGroups[0]
expGroup2 := *targetGroups[1]
expGroup1.Source = "job-x:static:123:0"
expGroup2.Source = "job-x:static:123:1"
// The static target provider sends on the channel once per target group.
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup1) {
t.Fatalf("expected target group %v, got %v", expGroup1, tg)
}
if tg := <-ch; !reflect.DeepEqual(tg, &expGroup2) {
t.Fatalf("expected target group %v, got %v", expGroup2, tg)
}
}
func TestTargetManagerChan(t *testing.T) {
testJob1 := &config.ScrapeConfig{
JobName: "test_job1",
@ -65,7 +110,7 @@ func TestTargetManagerChan(t *testing.T) {
},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
@ -82,12 +127,12 @@ func TestTargetManagerChan(t *testing.T) {
Labels: clientmodel.LabelSet{"group": "label"},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
},
"test_job1:src2": {
"src2": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1235", "group": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1235", "group": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1235", "group": "label"},
@ -99,7 +144,7 @@ func TestTargetManagerChan(t *testing.T) {
Targets: []clientmodel.LabelSet{},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-2:1234", "label": "set"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
@ -115,7 +160,7 @@ func TestTargetManagerChan(t *testing.T) {
},
},
expected: map[string][]clientmodel.LabelSet{
"test_job1:src1": {
"src1": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-1:1234", "added": "label"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-3:1234"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "test-4:1234", "fancy": "label"},
@ -239,7 +284,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
{
scrapeConfigs: []*config.ScrapeConfig{testJob1},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
"test_job1:static:0:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"},
},
@ -247,7 +292,7 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob1},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
"test_job1:static:0:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"},
},
@ -255,18 +300,18 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob1, testJob2},
expected: map[string][]clientmodel.LabelSet{
"test_job1:static:0": {
"test_job1:static:0:0": {
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.org:80", "testParam": "paramValue"},
{clientmodel.JobLabel: "test_job1", clientmodel.InstanceLabel: "example.com:80", "testParam": "paramValue"},
},
"test_job2:static:0": {
"test_job2:static:0:0": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080", "foo": "bar", "new": "ox-ba"},
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081", "foo": "bar", "new": "ox-ba"},
},
"test_job2:static:1": {
"test_job2:static:0:1": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "foo.com:1234"},
},
"test_job2:static:2": {
"test_job2:static:0:2": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "fixed"},
},
},
@ -276,14 +321,14 @@ func TestTargetManagerConfigUpdate(t *testing.T) {
}, {
scrapeConfigs: []*config.ScrapeConfig{testJob2},
expected: map[string][]clientmodel.LabelSet{
"test_job2:static:0": {
"test_job2:static:0:0": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.org:8080", "foo": "bar", "new": "ox-ba"},
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "example.com:8081", "foo": "bar", "new": "ox-ba"},
},
"test_job2:static:1": {
"test_job2:static:0:1": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "foo.com:1234"},
},
"test_job2:static:2": {
"test_job2:static:0:2": {
{clientmodel.JobLabel: "test_job2", clientmodel.InstanceLabel: "fixed"},
},
},