diff --git a/discovery/discovery.go b/discovery/discovery.go index 715681147..628ebb71f 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -15,6 +15,8 @@ package discovery import ( "fmt" + "sync" + "time" "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" @@ -37,12 +39,12 @@ import ( // The TargetProvider does not have to guarantee that an actual change happened. // It does guarantee that it sends the new TargetGroup whenever a change happens. // -// Providers must initially send all known target groups as soon as it can. +// TargetProviders should initially send a full set of all discoverable TargetGroups. type TargetProvider interface { // Run hands a channel to the target provider through which it can send - // updated target groups. The channel must be closed by the target provider - // if no more updates will be sent. - // On receiving from done Run must return. + // updated target groups. + // Must returns if the context gets canceled. It should not close the update + // channel on returning. Run(ctx context.Context, up chan<- []*config.TargetGroup) } @@ -135,3 +137,166 @@ func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGro } close(ch) } + +// TargetSet handles multiple TargetProviders and sends a full overview of their +// discovered TargetGroups to a Syncer. +type TargetSet struct { + mtx sync.RWMutex + // Sets of targets by a source string that is unique across target providers. + tgroups map[string]*config.TargetGroup + + syncer Syncer + + syncCh chan struct{} + providerCh chan map[string]TargetProvider + cancelProviders func() +} + +// Syncer receives updates complete sets of TargetGroups. +type Syncer interface { + Sync([]*config.TargetGroup) +} + +// NewTargetSet returns a new target sending TargetGroups to the Syncer. +func NewTargetSet(s Syncer) *TargetSet { + return &TargetSet{ + syncCh: make(chan struct{}, 1), + providerCh: make(chan map[string]TargetProvider), + syncer: s, + } +} + +// Run starts the processing of target providers and their updates. +// It blocks until the context gets canceled. +func (ts *TargetSet) Run(ctx context.Context) { +Loop: + for { + // Throttle syncing to once per five seconds. + select { + case <-ctx.Done(): + break Loop + case p := <-ts.providerCh: + ts.updateProviders(ctx, p) + case <-time.After(5 * time.Second): + } + + select { + case <-ctx.Done(): + break Loop + case <-ts.syncCh: + ts.sync() + case p := <-ts.providerCh: + ts.updateProviders(ctx, p) + } + } +} + +func (ts *TargetSet) sync() { + ts.mtx.RLock() + var all []*config.TargetGroup + for _, tg := range ts.tgroups { + all = append(all, tg) + } + ts.mtx.RUnlock() + + ts.syncer.Sync(all) +} + +// UpdateProviders sets new target providers for the target set. +func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) { + ts.providerCh <- p +} + +func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) { + // Lock for the entire time. This may mean up to 5 seconds until the full initial set + // is retrieved and applied. + // We could release earlier with some tweaks, but this is easier to reason about. + ts.mtx.Lock() + defer ts.mtx.Unlock() + + // Stop all previous target providers of the target set. + if ts.cancelProviders != nil { + ts.cancelProviders() + } + ctx, ts.cancelProviders = context.WithCancel(ctx) + + var wg sync.WaitGroup + // (Re-)create a fresh tgroups map to not keep stale targets around. We + // will retrieve all targets below anyway, so cleaning up everything is + // safe and doesn't inflict any additional cost. + ts.tgroups = map[string]*config.TargetGroup{} + + for name, prov := range providers { + wg.Add(1) + + updates := make(chan []*config.TargetGroup) + go prov.Run(ctx, updates) + + go func(name string, prov TargetProvider) { + select { + case <-ctx.Done(): + case initial, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + break + } + // First set of all targets the provider knows. + for _, tgroup := range initial { + ts.setTargetGroup(name, tgroup) + } + case <-time.After(5 * time.Second): + // Initial set didn't arrive. Act as if it was empty + // and wait for updates later on. + } + wg.Done() + + // Start listening for further updates. + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + for _, tg := range tgs { + ts.update(name, tg) + } + } + } + }(name, prov) + } + + // We wait for a full initial set of target groups before releasing the mutex + // to ensure the initial sync is complete and there are no races with subsequent updates. + wg.Wait() + // Just signal that there are initial sets to sync now. Actual syncing must only + // happen in the runScraping loop. + select { + case ts.syncCh <- struct{}{}: + default: + } +} + +// update handles a target group update from a target provider identified by the name. +func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) { + ts.mtx.Lock() + defer ts.mtx.Unlock() + + ts.setTargetGroup(name, tgroup) + + select { + case ts.syncCh <- struct{}{}: + default: + } +} + +func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) { + if tg == nil { + return + } + ts.tgroups[name+"/"+tg.Source] = tg +} diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go new file mode 100644 index 000000000..edb3480ac --- /dev/null +++ b/discovery/discovery_test.go @@ -0,0 +1,87 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "testing" + + "github.com/prometheus/prometheus/config" + "golang.org/x/net/context" + yaml "gopkg.in/yaml.v2" +) + +func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { + + verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) { + if _, ok := tgroups[name]; ok != present { + msg := "" + if !present { + msg = "not " + } + t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups) + } + } + + scrapeConfig := &config.ScrapeConfig{} + + sOne := ` +job_name: "foo" +static_configs: +- targets: ["foo:9090"] +- targets: ["bar:9090"] +` + if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil { + t.Fatalf("Unable to load YAML config sOne: %s", err) + } + called := make(chan struct{}) + + ts := NewTargetSet(&mockSyncer{ + sync: func([]*config.TargetGroup) { called <- struct{}{} }, + }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ts.Run(ctx) + + ts.UpdateProviders(ProvidersFromConfig(scrapeConfig)) + <-called + + verifyPresence(ts.tgroups, "static/0/0", true) + verifyPresence(ts.tgroups, "static/0/1", true) + + sTwo := ` +job_name: "foo" +static_configs: +- targets: ["foo:9090"] +` + if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil { + t.Fatalf("Unable to load YAML config sTwo: %s", err) + } + + ts.UpdateProviders(ProvidersFromConfig(scrapeConfig)) + <-called + + verifyPresence(ts.tgroups, "static/0/0", true) + verifyPresence(ts.tgroups, "static/0/1", false) +} + +type mockSyncer struct { + sync func(tgs []*config.TargetGroup) +} + +func (s *mockSyncer) Sync(tgs []*config.TargetGroup) { + if s.sync != nil { + s.sync(tgs) + } +} diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 8a9d8899b..e79d0f6c6 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -115,7 +115,7 @@ type scrapePool struct { newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop } -func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { +func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { client, err := NewHTTPClient(cfg) if err != nil { // Any errors that could occur here should be caught during config validation. @@ -124,6 +124,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape return &scrapePool{ appender: app, config: cfg, + ctx: ctx, client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, @@ -131,13 +132,6 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape } } -func (sp *scrapePool) init(ctx context.Context) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - - sp.ctx = ctx -} - // stop terminates all scrape loops and returns after they all terminated. func (sp *scrapePool) stop() { var wg sync.WaitGroup @@ -165,6 +159,7 @@ func (sp *scrapePool) stop() { // This method returns after all scrape loops that were stopped have fully terminated. func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { start := time.Now() + sp.mtx.Lock() defer sp.mtx.Unlock() @@ -206,11 +201,32 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { ) } +// Sync converts target groups into actual scrape targets and synchronizes +// the currently running scraper with the resulting set. +func (sp *scrapePool) Sync(tgs []*config.TargetGroup) { + start := time.Now() + + var all []*Target + for _, tg := range tgs { + targets, err := targetsFromGroup(tg, sp.config) + if err != nil { + log.With("err", err).Error("creating targets failed") + continue + } + all = append(all, targets...) + } + sp.sync(all) + + targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( + time.Since(start).Seconds(), + ) + targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() +} + // sync takes a list of potentially duplicated targets, deduplicates them, starts // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { - start := time.Now() sp.mtx.Lock() defer sp.mtx.Unlock() @@ -256,10 +272,6 @@ func (sp *scrapePool) sync(targets []*Target) { // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() - targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( - time.Since(start).Seconds(), - ) - targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() } // sampleAppender returns an appender for ingested samples from the target. diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 489d73c36..aaa19132f 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -36,7 +36,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppender{} cfg = &config.ScrapeConfig{} - sp = newScrapePool(cfg, app) + sp = newScrapePool(context.Background(), cfg, app) ) if a, ok := sp.appender.(*nopAppender); !ok || a != app { @@ -231,7 +231,7 @@ func TestScrapePoolReportAppender(t *testing.T) { target := newTestTarget("example.com:80", 10*time.Millisecond, nil) app := &nopAppender{} - sp := newScrapePool(cfg, app) + sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false wrapped := sp.reportAppender(target) @@ -266,7 +266,7 @@ func TestScrapePoolSampleAppender(t *testing.T) { target := newTestTarget("example.com:80", 10*time.Millisecond, nil) app := &nopAppender{} - sp := newScrapePool(cfg, app) + sp := newScrapePool(context.Background(), cfg, app) cfg.HonorLabels = false wrapped := sp.sampleAppender(target) diff --git a/retrieval/target.go b/retrieval/target.go index af66b094c..01a2bd262 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "io/ioutil" + "net" "net/http" "net/url" "strings" @@ -276,3 +277,103 @@ func (app relabelAppender) Append(s *model.Sample) error { return app.SampleAppender.Append(s) } + +// populateLabels builds a label set from the given label set and scrape configuration. +// It returns a label set before relabeling was applied as the second return value. +// Returns a nil label set if the target is dropped during relabeling. +func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { + if _, ok := lset[model.AddressLabel]; !ok { + return nil, nil, fmt.Errorf("no address") + } + // Copy labels into the labelset for the target if they are not + // set already. Apply the labelsets in order of decreasing precedence. + scrapeLabels := model.LabelSet{ + model.SchemeLabel: model.LabelValue(cfg.Scheme), + model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), + model.JobLabel: model.LabelValue(cfg.JobName), + } + for ln, lv := range scrapeLabels { + if _, ok := lset[ln]; !ok { + lset[ln] = lv + } + } + // Encode scrape query parameters as labels. + for k, v := range cfg.Params { + if len(v) > 0 { + lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) + } + } + + preRelabelLabels := lset + lset = relabel.Process(lset, cfg.RelabelConfigs...) + + // Check if the target was dropped. + if lset == nil { + return nil, nil, nil + } + + // addPort checks whether we should add a default port to the address. + // If the address is not valid, we don't append a port either. + addPort := func(s string) bool { + // If we can split, a port exists and we don't have to add one. + if _, _, err := net.SplitHostPort(s); err == nil { + return false + } + // If adding a port makes it valid, the previous error + // was not due to an invalid address and we can append a port. + _, _, err := net.SplitHostPort(s + ":1234") + return err == nil + } + // If it's an address with no trailing port, infer it based on the used scheme. + if addr := string(lset[model.AddressLabel]); addPort(addr) { + // Addresses reaching this point are already wrapped in [] if necessary. + switch lset[model.SchemeLabel] { + case "http", "": + addr = addr + ":80" + case "https": + addr = addr + ":443" + default: + return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) + } + lset[model.AddressLabel] = model.LabelValue(addr) + } + if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { + return nil, nil, err + } + + // Meta labels are deleted after relabelling. Other internal labels propagate to + // the target which decides whether they will be part of their label set. + for ln := range lset { + if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { + delete(lset, ln) + } + } + + // Default the instance label to the target address. + if _, ok := lset[model.InstanceLabel]; !ok { + lset[model.InstanceLabel] = lset[model.AddressLabel] + } + return lset, preRelabelLabels, nil +} + +// targetsFromGroup builds targets based on the given TargetGroup and config. +func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { + targets := make([]*Target, 0, len(tg.Targets)) + + for i, lset := range tg.Targets { + // Combine target labels with target group labels. + for ln, lv := range tg.Labels { + if _, ok := lset[ln]; !ok { + lset[ln] = lv + } + } + labels, origLabels, err := populateLabels(lset, cfg) + if err != nil { + return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) + } + if labels != nil { + targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) + } + } + return targets, nil +} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 8df1f3b22..7d96da02d 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -14,12 +14,8 @@ package retrieval import ( - "fmt" - "net" "sort" - "strings" "sync" - "time" "github.com/prometheus/common/log" "github.com/prometheus/common/model" @@ -27,7 +23,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/storage" ) @@ -47,6 +42,14 @@ type TargetManager struct { targetSets map[string]*targetSet } +type targetSet struct { + ctx context.Context + cancel func() + + ts *discovery.TargetSet + sp *scrapePool +} + // NewTargetManager creates a new TargetManager. func NewTargetManager(app storage.SampleAppender) *TargetManager { return &TargetManager{ @@ -95,23 +98,33 @@ func (tm *TargetManager) reload() { ts, ok := tm.targetSets[scfg.JobName] if !ok { - ts = newTargetSet(scfg, tm.appender) + ctx, cancel := context.WithCancel(tm.ctx) + ts = &targetSet{ + ctx: ctx, + cancel: cancel, + sp: newScrapePool(ctx, scfg, tm.appender), + } + ts.ts = discovery.NewTargetSet(ts.sp) + tm.targetSets[scfg.JobName] = ts tm.wg.Add(1) go func(ts *targetSet) { - ts.runScraping(tm.ctx) + // Run target set, which blocks until its context is canceled. + // Gracefully shut down pending scrapes in the scrape pool afterwards. + ts.ts.Run(ctx) + ts.sp.stop() tm.wg.Done() }(ts) } else { - ts.reload(scfg) + ts.sp.reload(scfg) } - ts.runProviders(tm.ctx, discovery.ProvidersFromConfig(scfg)) + ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg)) } - // Remove old target sets. Waiting for stopping is already guaranteed - // by the goroutine that started the target set. + // Remove old target sets. Waiting for scrape pools to complete pending + // scrape inserts is already guaranteed by the goroutine that started the target set. for name, ts := range tm.targetSets { if _, ok := jobs[name]; !ok { ts.cancel() @@ -129,14 +142,14 @@ func (tm *TargetManager) Pools() map[string]Targets { // TODO(fabxc): this is just a hack to maintain compatibility for now. for _, ps := range tm.targetSets { - ps.scrapePool.mtx.RLock() + ps.sp.mtx.RLock() - for _, t := range ps.scrapePool.targets { + for _, t := range ps.sp.targets { job := string(t.Labels()[model.JobLabel]) pools[job] = append(pools[job], t) } - ps.scrapePool.mtx.RUnlock() + ps.sp.mtx.RUnlock() } for _, targets := range pools { sort.Sort(targets) @@ -157,295 +170,3 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { } return nil } - -// targetSet holds several TargetProviders for which the same scrape configuration -// is used. It maintains target groups from all given providers and sync them -// to a scrape pool. -type targetSet struct { - mtx sync.RWMutex - - // Sets of targets by a source string that is unique across target providers. - tgroups map[string][]*Target - - scrapePool *scrapePool - config *config.ScrapeConfig - - syncCh chan struct{} - cancelScraping func() - cancelProviders func() -} - -func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet { - ts := &targetSet{ - scrapePool: newScrapePool(cfg, app), - syncCh: make(chan struct{}, 1), - config: cfg, - } - return ts -} - -func (ts *targetSet) cancel() { - ts.mtx.RLock() - defer ts.mtx.RUnlock() - - if ts.cancelScraping != nil { - ts.cancelScraping() - } - if ts.cancelProviders != nil { - ts.cancelProviders() - } -} - -func (ts *targetSet) reload(cfg *config.ScrapeConfig) { - ts.mtx.Lock() - ts.config = cfg - ts.mtx.Unlock() - - ts.scrapePool.reload(cfg) -} - -func (ts *targetSet) runScraping(ctx context.Context) { - ctx, ts.cancelScraping = context.WithCancel(ctx) - - ts.scrapePool.init(ctx) - -Loop: - for { - // Throttle syncing to once per five seconds. - select { - case <-ctx.Done(): - break Loop - case <-time.After(5 * time.Second): - } - - select { - case <-ctx.Done(): - break Loop - case <-ts.syncCh: - ts.mtx.RLock() - ts.sync() - ts.mtx.RUnlock() - } - } - - // We want to wait for all pending target scrapes to complete though to ensure there'll - // be no more storage writes after this point. - ts.scrapePool.stop() -} - -func (ts *targetSet) sync() { - var all []*Target - for _, targets := range ts.tgroups { - all = append(all, targets...) - } - ts.scrapePool.sync(all) -} - -func (ts *targetSet) runProviders(ctx context.Context, providers map[string]discovery.TargetProvider) { - // Lock for the entire time. This may mean up to 5 seconds until the full initial set - // is retrieved and applied. - // We could release earlier with some tweaks, but this is easier to reason about. - ts.mtx.Lock() - defer ts.mtx.Unlock() - - var wg sync.WaitGroup - - if ts.cancelProviders != nil { - ts.cancelProviders() - } - ctx, ts.cancelProviders = context.WithCancel(ctx) - - // (Re-)create a fresh tgroups map to not keep stale targets around. We - // will retrieve all targets below anyway, so cleaning up everything is - // safe and doesn't inflict any additional cost. - ts.tgroups = map[string][]*Target{} - - for name, prov := range providers { - wg.Add(1) - - updates := make(chan []*config.TargetGroup) - - go func(name string, prov discovery.TargetProvider) { - select { - case <-ctx.Done(): - case initial, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - // First set of all targets the provider knows. - for _, tgroup := range initial { - if tgroup == nil { - continue - } - targets, err := targetsFromGroup(tgroup, ts.config) - if err != nil { - log.With("target_group", tgroup).Errorf("Target update failed: %s", err) - continue - } - ts.tgroups[name+"/"+tgroup.Source] = targets - } - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - - wg.Done() - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - for _, tg := range tgs { - if err := ts.update(name, tg); err != nil { - log.With("target_group", tg).Errorf("Target update failed: %s", err) - } - } - } - } - }(name, prov) - - go prov.Run(ctx, updates) - } - - // We wait for a full initial set of target groups before releasing the mutex - // to ensure the initial sync is complete and there are no races with subsequent updates. - wg.Wait() - // Just signal that there are initial sets to sync now. Actual syncing must only - // happen in the runScraping loop. - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -// update handles a target group update from a target provider identified by the name. -func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error { - if tgroup == nil { - return nil - } - targets, err := targetsFromGroup(tgroup, ts.config) - if err != nil { - return err - } - - ts.mtx.Lock() - defer ts.mtx.Unlock() - - ts.tgroups[name+"/"+tgroup.Source] = targets - - select { - case ts.syncCh <- struct{}{}: - default: - } - - return nil -} - -// populateLabels builds a label set from the given label set and scrape configuration. -// It returns a label set before relabeling was applied as the second return value. -// Returns a nil label set if the target is dropped during relabeling. -func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) { - if _, ok := lset[model.AddressLabel]; !ok { - return nil, nil, fmt.Errorf("no address") - } - // Copy labels into the labelset for the target if they are not - // set already. Apply the labelsets in order of decreasing precedence. - scrapeLabels := model.LabelSet{ - model.SchemeLabel: model.LabelValue(cfg.Scheme), - model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath), - model.JobLabel: model.LabelValue(cfg.JobName), - } - for ln, lv := range scrapeLabels { - if _, ok := lset[ln]; !ok { - lset[ln] = lv - } - } - // Encode scrape query parameters as labels. - for k, v := range cfg.Params { - if len(v) > 0 { - lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0]) - } - } - - preRelabelLabels := lset - lset = relabel.Process(lset, cfg.RelabelConfigs...) - - // Check if the target was dropped. - if lset == nil { - return nil, nil, nil - } - - // addPort checks whether we should add a default port to the address. - // If the address is not valid, we don't append a port either. - addPort := func(s string) bool { - // If we can split, a port exists and we don't have to add one. - if _, _, err := net.SplitHostPort(s); err == nil { - return false - } - // If adding a port makes it valid, the previous error - // was not due to an invalid address and we can append a port. - _, _, err := net.SplitHostPort(s + ":1234") - return err == nil - } - // If it's an address with no trailing port, infer it based on the used scheme. - if addr := string(lset[model.AddressLabel]); addPort(addr) { - // Addresses reaching this point are already wrapped in [] if necessary. - switch lset[model.SchemeLabel] { - case "http", "": - addr = addr + ":80" - case "https": - addr = addr + ":443" - default: - return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme) - } - lset[model.AddressLabel] = model.LabelValue(addr) - } - if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil { - return nil, nil, err - } - - // Meta labels are deleted after relabelling. Other internal labels propagate to - // the target which decides whether they will be part of their label set. - for ln := range lset { - if strings.HasPrefix(string(ln), model.MetaLabelPrefix) { - delete(lset, ln) - } - } - - // Default the instance label to the target address. - if _, ok := lset[model.InstanceLabel]; !ok { - lset[model.InstanceLabel] = lset[model.AddressLabel] - } - return lset, preRelabelLabels, nil -} - -// targetsFromGroup builds targets based on the given TargetGroup and config. -func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) { - targets := make([]*Target, 0, len(tg.Targets)) - - for i, lset := range tg.Targets { - // Combine target labels with target group labels. - for ln, lv := range tg.Labels { - if _, ok := lset[ln]; !ok { - lset[ln] = lv - } - } - labels, origLabels, err := populateLabels(lset, cfg) - if err != nil { - return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err) - } - if labels != nil { - targets = append(targets, NewTarget(labels, origLabels, cfg.Params)) - } - } - return targets, nil -} diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 7b04f1524..dc4568a95 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -17,65 +17,10 @@ import ( "reflect" "testing" - "golang.org/x/net/context" - "gopkg.in/yaml.v2" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/storage/local" ) -func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - - verifyPresence := func(tgroups map[string][]*Target, name string, present bool) { - if _, ok := tgroups[name]; ok != present { - msg := "" - if !present { - msg = "not " - } - t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups) - } - - } - - scrapeConfig := &config.ScrapeConfig{} - - sOne := ` -job_name: "foo" -static_configs: -- targets: ["foo:9090"] -- targets: ["bar:9090"] -` - if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil { - t.Fatalf("Unable to load YAML config sOne: %s", err) - } - - // Not properly setting it up, but that seems okay - mss := &local.MemorySeriesStorage{} - - ts := newTargetSet(scrapeConfig, mss) - - ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig)) - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", true) - - sTwo := ` -job_name: "foo" -static_configs: -- targets: ["foo:9090"] -` - if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil { - t.Fatalf("Unable to load YAML config sTwo: %s", err) - } - - ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig)) - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", false) -} - func mustNewRegexp(s string) config.Regexp { re, err := config.NewRegexp(s) if err != nil {