discovery: move TargetSet into discovery package

This commit is contained in:
Fabian Reinartz 2016-11-22 12:48:30 +01:00
parent d19d1bcad3
commit d7f4f8b879
7 changed files with 412 additions and 381 deletions

View file

@ -15,6 +15,8 @@ package discovery
import ( import (
"fmt" "fmt"
"sync"
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
@ -37,12 +39,12 @@ import (
// The TargetProvider does not have to guarantee that an actual change happened. // The TargetProvider does not have to guarantee that an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens. // It does guarantee that it sends the new TargetGroup whenever a change happens.
// //
// Providers must initially send all known target groups as soon as it can. // TargetProviders should initially send a full set of all discoverable TargetGroups.
type TargetProvider interface { type TargetProvider interface {
// Run hands a channel to the target provider through which it can send // Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider // updated target groups.
// if no more updates will be sent. // Must returns if the context gets canceled. It should not close the update
// On receiving from done Run must return. // channel on returning.
Run(ctx context.Context, up chan<- []*config.TargetGroup) Run(ctx context.Context, up chan<- []*config.TargetGroup)
} }
@ -135,3 +137,166 @@ func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGro
} }
close(ch) close(ch)
} }
// TargetSet handles multiple TargetProviders and sends a full overview of their
// discovered TargetGroups to a Syncer.
type TargetSet struct {
mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
tgroups map[string]*config.TargetGroup
syncer Syncer
syncCh chan struct{}
providerCh chan map[string]TargetProvider
cancelProviders func()
}
// Syncer receives updates complete sets of TargetGroups.
type Syncer interface {
Sync([]*config.TargetGroup)
}
// NewTargetSet returns a new target sending TargetGroups to the Syncer.
func NewTargetSet(s Syncer) *TargetSet {
return &TargetSet{
syncCh: make(chan struct{}, 1),
providerCh: make(chan map[string]TargetProvider),
syncer: s,
}
}
// Run starts the processing of target providers and their updates.
// It blocks until the context gets canceled.
func (ts *TargetSet) Run(ctx context.Context) {
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.sync()
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
}
}
}
func (ts *TargetSet) sync() {
ts.mtx.RLock()
var all []*config.TargetGroup
for _, tg := range ts.tgroups {
all = append(all, tg)
}
ts.mtx.RUnlock()
ts.syncer.Sync(all)
}
// UpdateProviders sets new target providers for the target set.
func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) {
ts.providerCh <- p
}
func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) {
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
// is retrieved and applied.
// We could release earlier with some tweaks, but this is easier to reason about.
ts.mtx.Lock()
defer ts.mtx.Unlock()
// Stop all previous target providers of the target set.
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
var wg sync.WaitGroup
// (Re-)create a fresh tgroups map to not keep stale targets around. We
// will retrieve all targets below anyway, so cleaning up everything is
// safe and doesn't inflict any additional cost.
ts.tgroups = map[string]*config.TargetGroup{}
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go prov.Run(ctx, updates)
go func(name string, prov TargetProvider) {
select {
case <-ctx.Done():
case initial, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
// First set of all targets the provider knows.
for _, tgroup := range initial {
ts.setTargetGroup(name, tgroup)
}
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
for _, tg := range tgs {
ts.update(name, tg)
}
}
}
}(name, prov)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
// Just signal that there are initial sets to sync now. Actual syncing must only
// happen in the runScraping loop.
select {
case ts.syncCh <- struct{}{}:
default:
}
}
// update handles a target group update from a target provider identified by the name.
func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) {
ts.mtx.Lock()
defer ts.mtx.Unlock()
ts.setTargetGroup(name, tgroup)
select {
case ts.syncCh <- struct{}{}:
default:
}
}
func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) {
if tg == nil {
return
}
ts.tgroups[name+"/"+tg.Source] = tg
}

View file

@ -0,0 +1,87 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"testing"
"github.com/prometheus/prometheus/config"
"golang.org/x/net/context"
yaml "gopkg.in/yaml.v2"
)
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) {
if _, ok := tgroups[name]; ok != present {
msg := ""
if !present {
msg = "not "
}
t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups)
}
}
scrapeConfig := &config.ScrapeConfig{}
sOne := `
job_name: "foo"
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
`
if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
called := make(chan struct{})
ts := NewTargetSet(&mockSyncer{
sync: func([]*config.TargetGroup) { called <- struct{}{} },
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ts.Run(ctx)
ts.UpdateProviders(ProvidersFromConfig(scrapeConfig))
<-called
verifyPresence(ts.tgroups, "static/0/0", true)
verifyPresence(ts.tgroups, "static/0/1", true)
sTwo := `
job_name: "foo"
static_configs:
- targets: ["foo:9090"]
`
if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil {
t.Fatalf("Unable to load YAML config sTwo: %s", err)
}
ts.UpdateProviders(ProvidersFromConfig(scrapeConfig))
<-called
verifyPresence(ts.tgroups, "static/0/0", true)
verifyPresence(ts.tgroups, "static/0/1", false)
}
type mockSyncer struct {
sync func(tgs []*config.TargetGroup)
}
func (s *mockSyncer) Sync(tgs []*config.TargetGroup) {
if s.sync != nil {
s.sync(tgs)
}
}

View file

@ -115,7 +115,7 @@ type scrapePool struct {
newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop newLoop func(context.Context, scraper, storage.SampleAppender, storage.SampleAppender) loop
} }
func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool {
client, err := NewHTTPClient(cfg) client, err := NewHTTPClient(cfg)
if err != nil { if err != nil {
// Any errors that could occur here should be caught during config validation. // Any errors that could occur here should be caught during config validation.
@ -124,6 +124,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape
return &scrapePool{ return &scrapePool{
appender: app, appender: app,
config: cfg, config: cfg,
ctx: ctx,
client: client, client: client,
targets: map[uint64]*Target{}, targets: map[uint64]*Target{},
loops: map[uint64]loop{}, loops: map[uint64]loop{},
@ -131,13 +132,6 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.SampleAppender) *scrape
} }
} }
func (sp *scrapePool) init(ctx context.Context) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
sp.ctx = ctx
}
// stop terminates all scrape loops and returns after they all terminated. // stop terminates all scrape loops and returns after they all terminated.
func (sp *scrapePool) stop() { func (sp *scrapePool) stop() {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -165,6 +159,7 @@ func (sp *scrapePool) stop() {
// This method returns after all scrape loops that were stopped have fully terminated. // This method returns after all scrape loops that were stopped have fully terminated.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
start := time.Now() start := time.Now()
sp.mtx.Lock() sp.mtx.Lock()
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
@ -206,11 +201,32 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
) )
} }
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set.
func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
start := time.Now()
var all []*Target
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
log.With("err", err).Error("creating targets failed")
continue
}
all = append(all, targets...)
}
sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}
// sync takes a list of potentially duplicated targets, deduplicates them, starts // sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets. // scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated. // It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) { func (sp *scrapePool) sync(targets []*Target) {
start := time.Now()
sp.mtx.Lock() sp.mtx.Lock()
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
@ -256,10 +272,6 @@ func (sp *scrapePool) sync(targets []*Target) {
// may be active and tries to insert. The old scraper that didn't terminate yet could still // may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set. // be inserting a previous sample set.
wg.Wait() wg.Wait()
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
} }
// sampleAppender returns an appender for ingested samples from the target. // sampleAppender returns an appender for ingested samples from the target.

View file

@ -36,7 +36,7 @@ func TestNewScrapePool(t *testing.T) {
var ( var (
app = &nopAppender{} app = &nopAppender{}
cfg = &config.ScrapeConfig{} cfg = &config.ScrapeConfig{}
sp = newScrapePool(cfg, app) sp = newScrapePool(context.Background(), cfg, app)
) )
if a, ok := sp.appender.(*nopAppender); !ok || a != app { if a, ok := sp.appender.(*nopAppender); !ok || a != app {
@ -231,7 +231,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
target := newTestTarget("example.com:80", 10*time.Millisecond, nil) target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppender{} app := &nopAppender{}
sp := newScrapePool(cfg, app) sp := newScrapePool(context.Background(), cfg, app)
cfg.HonorLabels = false cfg.HonorLabels = false
wrapped := sp.reportAppender(target) wrapped := sp.reportAppender(target)
@ -266,7 +266,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
target := newTestTarget("example.com:80", 10*time.Millisecond, nil) target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppender{} app := &nopAppender{}
sp := newScrapePool(cfg, app) sp := newScrapePool(context.Background(), cfg, app)
cfg.HonorLabels = false cfg.HonorLabels = false
wrapped := sp.sampleAppender(target) wrapped := sp.sampleAppender(target)

View file

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -276,3 +277,103 @@ func (app relabelAppender) Append(s *model.Sample) error {
return app.SampleAppender.Append(s) return app.SampleAppender.Append(s)
} }
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling.
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) {
if _, ok := lset[model.AddressLabel]; !ok {
return nil, nil, fmt.Errorf("no address")
}
// Copy labels into the labelset for the target if they are not
// set already. Apply the labelsets in order of decreasing precedence.
scrapeLabels := model.LabelSet{
model.SchemeLabel: model.LabelValue(cfg.Scheme),
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath),
model.JobLabel: model.LabelValue(cfg.JobName),
}
for ln, lv := range scrapeLabels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if len(v) > 0 {
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0])
}
}
preRelabelLabels := lset
lset = relabel.Process(lset, cfg.RelabelConfigs...)
// Check if the target was dropped.
if lset == nil {
return nil, nil, nil
}
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
if _, _, err := net.SplitHostPort(s); err == nil {
return false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
}
// If it's an address with no trailing port, infer it based on the used scheme.
if addr := string(lset[model.AddressLabel]); addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset[model.SchemeLabel] {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
default:
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
}
lset[model.AddressLabel] = model.LabelValue(addr)
}
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
return nil, nil, err
}
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
for ln := range lset {
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
delete(lset, ln)
}
}
// Default the instance label to the target address.
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = lset[model.AddressLabel]
}
return lset, preRelabelLabels, nil
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets))
for i, lset := range tg.Targets {
// Combine target labels with target group labels.
for ln, lv := range tg.Labels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
}
}
labels, origLabels, err := populateLabels(lset, cfg)
if err != nil {
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
}
if labels != nil {
targets = append(targets, NewTarget(labels, origLabels, cfg.Params))
}
}
return targets, nil
}

View file

@ -14,12 +14,8 @@
package retrieval package retrieval
import ( import (
"fmt"
"net"
"sort" "sort"
"strings"
"sync" "sync"
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -27,7 +23,6 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/relabel"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
@ -47,6 +42,14 @@ type TargetManager struct {
targetSets map[string]*targetSet targetSets map[string]*targetSet
} }
type targetSet struct {
ctx context.Context
cancel func()
ts *discovery.TargetSet
sp *scrapePool
}
// NewTargetManager creates a new TargetManager. // NewTargetManager creates a new TargetManager.
func NewTargetManager(app storage.SampleAppender) *TargetManager { func NewTargetManager(app storage.SampleAppender) *TargetManager {
return &TargetManager{ return &TargetManager{
@ -95,23 +98,33 @@ func (tm *TargetManager) reload() {
ts, ok := tm.targetSets[scfg.JobName] ts, ok := tm.targetSets[scfg.JobName]
if !ok { if !ok {
ts = newTargetSet(scfg, tm.appender) ctx, cancel := context.WithCancel(tm.ctx)
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.appender),
}
ts.ts = discovery.NewTargetSet(ts.sp)
tm.targetSets[scfg.JobName] = ts tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1) tm.wg.Add(1)
go func(ts *targetSet) { go func(ts *targetSet) {
ts.runScraping(tm.ctx) // Run target set, which blocks until its context is canceled.
// Gracefully shut down pending scrapes in the scrape pool afterwards.
ts.ts.Run(ctx)
ts.sp.stop()
tm.wg.Done() tm.wg.Done()
}(ts) }(ts)
} else { } else {
ts.reload(scfg) ts.sp.reload(scfg)
} }
ts.runProviders(tm.ctx, discovery.ProvidersFromConfig(scfg)) ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg))
} }
// Remove old target sets. Waiting for stopping is already guaranteed // Remove old target sets. Waiting for scrape pools to complete pending
// by the goroutine that started the target set. // scrape inserts is already guaranteed by the goroutine that started the target set.
for name, ts := range tm.targetSets { for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok { if _, ok := jobs[name]; !ok {
ts.cancel() ts.cancel()
@ -129,14 +142,14 @@ func (tm *TargetManager) Pools() map[string]Targets {
// TODO(fabxc): this is just a hack to maintain compatibility for now. // TODO(fabxc): this is just a hack to maintain compatibility for now.
for _, ps := range tm.targetSets { for _, ps := range tm.targetSets {
ps.scrapePool.mtx.RLock() ps.sp.mtx.RLock()
for _, t := range ps.scrapePool.targets { for _, t := range ps.sp.targets {
job := string(t.Labels()[model.JobLabel]) job := string(t.Labels()[model.JobLabel])
pools[job] = append(pools[job], t) pools[job] = append(pools[job], t)
} }
ps.scrapePool.mtx.RUnlock() ps.sp.mtx.RUnlock()
} }
for _, targets := range pools { for _, targets := range pools {
sort.Sort(targets) sort.Sort(targets)
@ -157,295 +170,3 @@ func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
} }
return nil return nil
} }
// targetSet holds several TargetProviders for which the same scrape configuration
// is used. It maintains target groups from all given providers and sync them
// to a scrape pool.
type targetSet struct {
mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
tgroups map[string][]*Target
scrapePool *scrapePool
config *config.ScrapeConfig
syncCh chan struct{}
cancelScraping func()
cancelProviders func()
}
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
ts := &targetSet{
scrapePool: newScrapePool(cfg, app),
syncCh: make(chan struct{}, 1),
config: cfg,
}
return ts
}
func (ts *targetSet) cancel() {
ts.mtx.RLock()
defer ts.mtx.RUnlock()
if ts.cancelScraping != nil {
ts.cancelScraping()
}
if ts.cancelProviders != nil {
ts.cancelProviders()
}
}
func (ts *targetSet) reload(cfg *config.ScrapeConfig) {
ts.mtx.Lock()
ts.config = cfg
ts.mtx.Unlock()
ts.scrapePool.reload(cfg)
}
func (ts *targetSet) runScraping(ctx context.Context) {
ctx, ts.cancelScraping = context.WithCancel(ctx)
ts.scrapePool.init(ctx)
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.mtx.RLock()
ts.sync()
ts.mtx.RUnlock()
}
}
// We want to wait for all pending target scrapes to complete though to ensure there'll
// be no more storage writes after this point.
ts.scrapePool.stop()
}
func (ts *targetSet) sync() {
var all []*Target
for _, targets := range ts.tgroups {
all = append(all, targets...)
}
ts.scrapePool.sync(all)
}
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]discovery.TargetProvider) {
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
// is retrieved and applied.
// We could release earlier with some tweaks, but this is easier to reason about.
ts.mtx.Lock()
defer ts.mtx.Unlock()
var wg sync.WaitGroup
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
// (Re-)create a fresh tgroups map to not keep stale targets around. We
// will retrieve all targets below anyway, so cleaning up everything is
// safe and doesn't inflict any additional cost.
ts.tgroups = map[string][]*Target{}
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go func(name string, prov discovery.TargetProvider) {
select {
case <-ctx.Done():
case initial, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
// First set of all targets the provider knows.
for _, tgroup := range initial {
if tgroup == nil {
continue
}
targets, err := targetsFromGroup(tgroup, ts.config)
if err != nil {
log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
continue
}
ts.tgroups[name+"/"+tgroup.Source] = targets
}
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
for _, tg := range tgs {
if err := ts.update(name, tg); err != nil {
log.With("target_group", tg).Errorf("Target update failed: %s", err)
}
}
}
}
}(name, prov)
go prov.Run(ctx, updates)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
// Just signal that there are initial sets to sync now. Actual syncing must only
// happen in the runScraping loop.
select {
case ts.syncCh <- struct{}{}:
default:
}
}
// update handles a target group update from a target provider identified by the name.
func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error {
if tgroup == nil {
return nil
}
targets, err := targetsFromGroup(tgroup, ts.config)
if err != nil {
return err
}
ts.mtx.Lock()
defer ts.mtx.Unlock()
ts.tgroups[name+"/"+tgroup.Source] = targets
select {
case ts.syncCh <- struct{}{}:
default:
}
return nil
}
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling.
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) {
if _, ok := lset[model.AddressLabel]; !ok {
return nil, nil, fmt.Errorf("no address")
}
// Copy labels into the labelset for the target if they are not
// set already. Apply the labelsets in order of decreasing precedence.
scrapeLabels := model.LabelSet{
model.SchemeLabel: model.LabelValue(cfg.Scheme),
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath),
model.JobLabel: model.LabelValue(cfg.JobName),
}
for ln, lv := range scrapeLabels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if len(v) > 0 {
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0])
}
}
preRelabelLabels := lset
lset = relabel.Process(lset, cfg.RelabelConfigs...)
// Check if the target was dropped.
if lset == nil {
return nil, nil, nil
}
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
if _, _, err := net.SplitHostPort(s); err == nil {
return false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
}
// If it's an address with no trailing port, infer it based on the used scheme.
if addr := string(lset[model.AddressLabel]); addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset[model.SchemeLabel] {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
default:
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
}
lset[model.AddressLabel] = model.LabelValue(addr)
}
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
return nil, nil, err
}
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
for ln := range lset {
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
delete(lset, ln)
}
}
// Default the instance label to the target address.
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = lset[model.AddressLabel]
}
return lset, preRelabelLabels, nil
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets))
for i, lset := range tg.Targets {
// Combine target labels with target group labels.
for ln, lv := range tg.Labels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
}
}
labels, origLabels, err := populateLabels(lset, cfg)
if err != nil {
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
}
if labels != nil {
targets = append(targets, NewTarget(labels, origLabels, cfg.Params))
}
}
return targets, nil
}

View file

@ -17,65 +17,10 @@ import (
"reflect" "reflect"
"testing" "testing"
"golang.org/x/net/context"
"gopkg.in/yaml.v2"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/storage/local"
) )
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tgroups map[string][]*Target, name string, present bool) {
if _, ok := tgroups[name]; ok != present {
msg := ""
if !present {
msg = "not "
}
t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups)
}
}
scrapeConfig := &config.ScrapeConfig{}
sOne := `
job_name: "foo"
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
`
if err := yaml.Unmarshal([]byte(sOne), scrapeConfig); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
// Not properly setting it up, but that seems okay
mss := &local.MemorySeriesStorage{}
ts := newTargetSet(scrapeConfig, mss)
ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig))
verifyPresence(ts.tgroups, "static/0/0", true)
verifyPresence(ts.tgroups, "static/0/1", true)
sTwo := `
job_name: "foo"
static_configs:
- targets: ["foo:9090"]
`
if err := yaml.Unmarshal([]byte(sTwo), scrapeConfig); err != nil {
t.Fatalf("Unable to load YAML config sTwo: %s", err)
}
ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig))
verifyPresence(ts.tgroups, "static/0/0", true)
verifyPresence(ts.tgroups, "static/0/1", false)
}
func mustNewRegexp(s string) config.Regexp { func mustNewRegexp(s string) config.Regexp {
re, err := config.NewRegexp(s) re, err := config.NewRegexp(s)
if err != nil { if err != nil {