prometheus/retrieval/targetmanager.go

558 lines
15 KiB
Go
Raw Normal View History

// Copyright 2013 The Prometheus Authors
2013-01-04 05:41:47 -08:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"fmt"
"net"
2016-03-08 08:12:27 -08:00
"sort"
"strings"
"sync"
"time"
2015-10-03 01:21:43 -07:00
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/relabel"
"github.com/prometheus/prometheus/retrieval/discovery"
"github.com/prometheus/prometheus/storage"
2013-01-04 05:41:47 -08:00
)
// A TargetProvider provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a target provider
// detects a potential change, it sends the TargetGroup through its provided channel.
//
// The TargetProvider does not have to guarantee that an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens.
2015-08-13 11:38:25 -07:00
//
// Providers must initially send all known target groups as soon as it can.
type TargetProvider interface {
// Run hands a channel to the target provider through which it can send
// updated target groups. The channel must be closed by the target provider
// if no more updates will be sent.
// On receiving from done Run must return.
Run(ctx context.Context, up chan<- []*config.TargetGroup)
2013-01-04 05:41:47 -08:00
}
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
appender storage.SampleAppender
scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex
ctx context.Context
cancel func()
wg sync.WaitGroup
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
2013-01-04 05:41:47 -08:00
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app storage.SampleAppender) *TargetManager {
return &TargetManager{
appender: app,
targetSets: map[string]*targetSet{},
}
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
log.Info("Starting target manager...")
2013-01-04 05:41:47 -08:00
tm.mtx.Lock()
tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.reload()
tm.mtx.Unlock()
tm.wg.Wait()
}
2013-01-04 05:41:47 -08:00
// Stop all background processing.
func (tm *TargetManager) Stop() {
log.Infoln("Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
// and all in-flight scrapes to abort immmediately.
// Started inserts will be finished before terminating.
tm.cancel()
tm.mtx.Unlock()
// Wait for all scrape inserts to complete.
tm.wg.Wait()
log.Debugln("Target manager stopped")
}
func (tm *TargetManager) reload() {
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ts = newTargetSet(scfg, tm.appender)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
ts.runScraping(tm.ctx)
tm.wg.Done()
}(ts)
} else {
ts.reload(scfg)
}
ts.runProviders(tm.ctx, providersFromConfig(scfg))
}
// Remove old target sets. Waiting for stopping is already guaranteed
// by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok {
ts.cancel()
delete(tm.targetSets, name)
}
}
}
// Pools returns the targets currently being scraped bucketed by their job name.
2016-03-08 08:12:27 -08:00
func (tm *TargetManager) Pools() map[string]Targets {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
2016-03-08 08:12:27 -08:00
pools := map[string]Targets{}
// TODO(fabxc): this is just a hack to maintain compatibility for now.
for _, ps := range tm.targetSets {
ps.scrapePool.mtx.RLock()
for _, t := range ps.scrapePool.targets {
job := string(t.Labels()[model.JobLabel])
pools[job] = append(pools[job], t)
}
ps.scrapePool.mtx.RUnlock()
}
2016-03-08 08:12:27 -08:00
for _, targets := range pools {
sort.Sort(targets)
}
return pools
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return nil
}
// targetSet holds several TargetProviders for which the same scrape configuration
// is used. It maintains target groups from all given providers and sync them
// to a scrape pool.
type targetSet struct {
mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
retrieval: Clean up target group map on config reload Also, remove unused `providers` field in targetSet. If the config file changes, we recreate all providers (by calling `providersFromConfig`) and retrieve all targets anew from the newly created providers. From that perspective, it cannot harm to clean up the target group map in the targetSet. Not doing so (as it was the case so far) keeps stale targets around. This mattered if an existing key in the target group map was not overwritten in the initial fetch of all targets from the providers. Examples where that mattered: ``` scrape_configs: - job_name: "foo" static_configs: - targets: ["foo:9090"] - targets: ["bar:9090"] ``` updated to: ``` scrape_configs: - job_name: "foo" static_configs: - targets: ["foo:9090"] ``` `bar:9090` would still be monitored. (The static provider just enumerates the target groups. If the number of target groups decreases, the old ones stay around. ``` scrape_configs: - job_name: "foo" dns_sd_configs: - names: - "srv.name.one.example.org" ``` updated to: ``` scrape_configs: - job_name: "foo" dns_sd_configs: - names: - "srv.name.two.example.org" ``` Now both SRV records are still monitored. The SRV name is part of the key in the target group map, thus the new one is just added and the old ane stays around. Obviously, this should have tests, and should have tests before, not only for this case. This is the quick fix. I have created https://github.com/prometheus/prometheus/issues/1906 to track test creation. Fixes https://github.com/prometheus/prometheus/issues/1610 .
2016-08-22 10:25:33 -07:00
tgroups map[string][]*Target
scrapePool *scrapePool
config *config.ScrapeConfig
syncCh chan struct{}
cancelScraping func()
cancelProviders func()
}
func newTargetSet(cfg *config.ScrapeConfig, app storage.SampleAppender) *targetSet {
ts := &targetSet{
scrapePool: newScrapePool(cfg, app),
syncCh: make(chan struct{}, 1),
config: cfg,
}
return ts
}
func (ts *targetSet) cancel() {
ts.mtx.RLock()
defer ts.mtx.RUnlock()
if ts.cancelScraping != nil {
ts.cancelScraping()
}
if ts.cancelProviders != nil {
ts.cancelProviders()
}
}
func (ts *targetSet) reload(cfg *config.ScrapeConfig) {
ts.mtx.Lock()
ts.config = cfg
ts.mtx.Unlock()
ts.scrapePool.reload(cfg)
}
func (ts *targetSet) runScraping(ctx context.Context) {
ctx, ts.cancelScraping = context.WithCancel(ctx)
ts.scrapePool.init(ctx)
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.mtx.RLock()
ts.sync()
ts.mtx.RUnlock()
}
}
// We want to wait for all pending target scrapes to complete though to ensure there'll
// be no more storage writes after this point.
ts.scrapePool.stop()
}
func (ts *targetSet) sync() {
var all []*Target
for _, targets := range ts.tgroups {
all = append(all, targets...)
}
ts.scrapePool.sync(all)
}
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
// is retrieved and applied.
// We could release earlier with some tweaks, but this is easier to reason about.
ts.mtx.Lock()
defer ts.mtx.Unlock()
var wg sync.WaitGroup
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
retrieval: Clean up target group map on config reload Also, remove unused `providers` field in targetSet. If the config file changes, we recreate all providers (by calling `providersFromConfig`) and retrieve all targets anew from the newly created providers. From that perspective, it cannot harm to clean up the target group map in the targetSet. Not doing so (as it was the case so far) keeps stale targets around. This mattered if an existing key in the target group map was not overwritten in the initial fetch of all targets from the providers. Examples where that mattered: ``` scrape_configs: - job_name: "foo" static_configs: - targets: ["foo:9090"] - targets: ["bar:9090"] ``` updated to: ``` scrape_configs: - job_name: "foo" static_configs: - targets: ["foo:9090"] ``` `bar:9090` would still be monitored. (The static provider just enumerates the target groups. If the number of target groups decreases, the old ones stay around. ``` scrape_configs: - job_name: "foo" dns_sd_configs: - names: - "srv.name.one.example.org" ``` updated to: ``` scrape_configs: - job_name: "foo" dns_sd_configs: - names: - "srv.name.two.example.org" ``` Now both SRV records are still monitored. The SRV name is part of the key in the target group map, thus the new one is just added and the old ane stays around. Obviously, this should have tests, and should have tests before, not only for this case. This is the quick fix. I have created https://github.com/prometheus/prometheus/issues/1906 to track test creation. Fixes https://github.com/prometheus/prometheus/issues/1610 .
2016-08-22 10:25:33 -07:00
// (Re-)create a fresh tgroups map to not keep stale targets around. We
// will retrieve all targets below anyway, so cleaning up everything is
// safe and doesn't inflict any additional cost.
ts.tgroups = map[string][]*Target{}
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go func(name string, prov TargetProvider) {
select {
case <-ctx.Done():
case initial, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
// First set of all targets the provider knows.
for _, tgroup := range initial {
if tgroup == nil {
continue
}
targets, err := targetsFromGroup(tgroup, ts.config)
if err != nil {
log.With("target_group", tgroup).Errorf("Target update failed: %s", err)
continue
}
ts.tgroups[name+"/"+tgroup.Source] = targets
}
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
for _, tg := range tgs {
if err := ts.update(name, tg); err != nil {
log.With("target_group", tg).Errorf("Target update failed: %s", err)
}
}
}
}
}(name, prov)
go prov.Run(ctx, updates)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
// Just signal that there are initial sets to sync now. Actual syncing must only
// happen in the runScraping loop.
select {
case ts.syncCh <- struct{}{}:
default:
}
}
// update handles a target group update from a target provider identified by the name.
func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error {
if tgroup == nil {
return nil
}
targets, err := targetsFromGroup(tgroup, ts.config)
if err != nil {
return err
}
ts.mtx.Lock()
defer ts.mtx.Unlock()
ts.tgroups[name+"/"+tgroup.Source] = targets
select {
case ts.syncCh <- struct{}{}:
default:
}
return nil
}
// providersFromConfig returns all TargetProviders configured in cfg.
func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
providers := map[string]TargetProvider{}
app := func(mech string, i int, tp TargetProvider) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
}
for i, c := range cfg.DNSSDConfigs {
2016-05-05 02:45:51 -07:00
app("dns", i, discovery.NewDNS(c))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, discovery.NewFileDiscovery(c))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := discovery.NewConsul(c)
2015-11-30 09:41:48 -08:00
if err != nil {
log.Errorf("Cannot create Consul discovery: %s", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
2016-09-29 05:57:28 -07:00
m, err := discovery.NewMarathon(c)
if err != nil {
log.Errorf("Cannot create Marathon discovery: %s", err)
continue
}
app("marathon", i, m)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := discovery.NewKubernetesDiscovery(c)
if err != nil {
log.Errorf("Cannot create Kubernetes discovery: %s", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, discovery.NewServersetDiscovery(c))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, discovery.NewNerveDiscovery(c))
}
2015-09-21 11:49:19 -07:00
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, discovery.NewEC2Discovery(c))
}
2016-06-28 06:15:37 -07:00
for i, c := range cfg.GCESDConfigs {
gced, err := discovery.NewGCEDiscovery(c)
if err != nil {
log.Errorf("Cannot initialize GCE discovery: %s", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, discovery.NewAzureDiscovery(c))
}
if len(cfg.StaticConfigs) > 0 {
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
}
return providers
}
// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns a nil label set if the target is dropped during relabeling.
func populateLabels(lset model.LabelSet, cfg *config.ScrapeConfig) (res, orig model.LabelSet, err error) {
if _, ok := lset[model.AddressLabel]; !ok {
return nil, nil, fmt.Errorf("no address")
}
// Copy labels into the labelset for the target if they are not
// set already. Apply the labelsets in order of decreasing precedence.
scrapeLabels := model.LabelSet{
model.SchemeLabel: model.LabelValue(cfg.Scheme),
model.MetricsPathLabel: model.LabelValue(cfg.MetricsPath),
model.JobLabel: model.LabelValue(cfg.JobName),
}
for ln, lv := range scrapeLabels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
}
}
// Encode scrape query parameters as labels.
for k, v := range cfg.Params {
if len(v) > 0 {
lset[model.LabelName(model.ParamLabelPrefix+k)] = model.LabelValue(v[0])
}
}
preRelabelLabels := lset
lset = relabel.Process(lset, cfg.RelabelConfigs...)
// Check if the target was dropped.
if lset == nil {
return nil, nil, nil
}
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
if _, _, err := net.SplitHostPort(s); err == nil {
return false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
}
// If it's an address with no trailing port, infer it based on the used scheme.
if addr := string(lset[model.AddressLabel]); addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset[model.SchemeLabel] {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
default:
return nil, nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
}
lset[model.AddressLabel] = model.LabelValue(addr)
}
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
return nil, nil, err
}
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
for ln := range lset {
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
delete(lset, ln)
}
}
// Default the instance label to the target address.
if _, ok := lset[model.InstanceLabel]; !ok {
lset[model.InstanceLabel] = lset[model.AddressLabel]
}
return lset, preRelabelLabels, nil
}
// targetsFromGroup builds targets based on the given TargetGroup and config.
func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets))
for i, lset := range tg.Targets {
// Combine target labels with target group labels.
for ln, lv := range tg.Labels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
}
}
labels, origLabels, err := populateLabels(lset, cfg)
if err != nil {
return nil, fmt.Errorf("instance %d in group %s: %s", i, tg, err)
}
if labels != nil {
targets = append(targets, NewTarget(labels, origLabels, cfg.Params))
}
}
return targets, nil
}
// StaticProvider holds a list of target groups that never change.
type StaticProvider struct {
TargetGroups []*config.TargetGroup
}
// NewStaticProvider returns a StaticProvider configured with the given
// target groups.
2015-05-07 01:55:03 -07:00
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
for i, tg := range groups {
tg.Source = fmt.Sprintf("%d", i)
2015-05-07 01:55:03 -07:00
}
return &StaticProvider{groups}
}
// Run implements the TargetProvider interface.
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// We still have to consider that the consumer exits right away in which case
// the context will be canceled.
select {
case ch <- sd.TargetGroups:
case <-ctx.Done():
}
close(ch)
}