From 432005826d06e6d6ac8f2d3836f7fdf66d9c3092 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Wed, 20 Oct 2021 10:15:54 +0200 Subject: [PATCH] Add a feature flag to enable the new discovery manager (#9537) * Add a feature flag to enable the new manager This PR creates a copy of the legacy manager and uses it by default. It is a companion PR to #9349. With this PR, users can enable the new discovery manager and provide us with any feedback / side effects that the new behaviour might have. Signed-off-by: Julien Pivotto --- cmd/prometheus/main.go | 34 +- discovery/legacymanager/manager.go | 357 +++++++ discovery/legacymanager/manager_test.go | 1143 +++++++++++++++++++++++ discovery/legacymanager/registry.go | 259 +++++ discovery/manager.go | 2 +- docs/feature_flags.md | 24 +- 6 files changed, 1809 insertions(+), 10 deletions(-) create mode 100644 discovery/legacymanager/manager.go create mode 100644 discovery/legacymanager/manager_test.go create mode 100644 discovery/legacymanager/registry.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 6eb850f383..d50cd3e7f3 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -58,6 +58,8 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" _ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations. + "github.com/prometheus/prometheus/discovery/legacymanager" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" @@ -122,6 +124,7 @@ type flagConfig struct { enablePromQLAtModifier bool enablePromQLNegativeOffset bool enableExpandExternalLabels bool + enableNewSDManager bool prometheusURL string corsRegexString string @@ -156,6 +159,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "extra-scrape-metrics": c.scrape.ExtraMetrics = true level.Info(logger).Log("msg", "Experimental additional scrape metrics") + case "new-service-discovery-manager": + c.enableNewSDManager = true + level.Info(logger).Log("msg", "Experimental service discovery manager") case "": continue default: @@ -319,7 +325,7 @@ func main() { a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) @@ -459,11 +465,22 @@ func main() { notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier")) ctxScrape, cancelScrape = context.WithCancel(context.Background()) - discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape")) - ctxNotify, cancelNotify = context.WithCancel(context.Background()) - discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")) + discoveryManagerScrape discoveryManager + discoveryManagerNotify discoveryManager + ) + if cfg.enableNewSDManager { + discovery.RegisterMetrics() + discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape")) + discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")) + } else { + legacymanager.RegisterMetrics() + discoveryManagerScrape = legacymanager.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), legacymanager.Name("scrape")) + discoveryManagerNotify = legacymanager.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), legacymanager.Name("notify")) + } + + var ( scrapeManager = scrape.NewManager(&cfg.scrape, log.With(logger, "component", "scrape manager"), fanoutStorage) opts = promql.EngineOpts{ @@ -1346,3 +1363,12 @@ func (l jaegerLogger) Infof(msg string, args ...interface{}) { keyvals := []interface{}{"msg", fmt.Sprintf(msg, args...)} level.Info(l.logger).Log(keyvals...) } + +// discoveryManager interfaces the discovery manager. This is used to keep using +// the manager that restarts SD's on reload for a few releases until we feel +// the new manager can be enabled for all users. +type discoveryManager interface { + ApplyConfig(cfg map[string]discovery.Configs) error + Run() error + SyncCh() <-chan map[string][]*targetgroup.Group +} diff --git a/discovery/legacymanager/manager.go b/discovery/legacymanager/manager.go new file mode 100644 index 0000000000..7a3d6b3b82 --- /dev/null +++ b/discovery/legacymanager/manager.go @@ -0,0 +1,357 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package legacymanager + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +var ( + failedConfigs = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "prometheus_sd_failed_configs", + Help: "Current number of service discovery configurations that failed to load.", + }, + []string{"name"}, + ) + discoveredTargets = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "prometheus_sd_discovered_targets", + Help: "Current number of discovered targets.", + }, + []string{"name", "config"}, + ) + receivedUpdates = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_received_updates_total", + Help: "Total number of update events received from the SD providers.", + }, + []string{"name"}, + ) + delayedUpdates = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_updates_delayed_total", + Help: "Total number of update events that couldn't be sent immediately.", + }, + []string{"name"}, + ) + sentUpdates = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_sd_updates_total", + Help: "Total number of update events sent to the SD consumers.", + }, + []string{"name"}, + ) +) + +func RegisterMetrics() { + prometheus.MustRegister(failedConfigs, discoveredTargets, receivedUpdates, delayedUpdates, sentUpdates) +} + +type poolKey struct { + setName string + provider string +} + +// provider holds a Discoverer instance, its configuration and its subscribers. +type provider struct { + name string + d discovery.Discoverer + subs []string + config interface{} +} + +// NewManager is the Discovery Manager constructor. +func NewManager(ctx context.Context, logger log.Logger, options ...func(*Manager)) *Manager { + if logger == nil { + logger = log.NewNopLogger() + } + mgr := &Manager{ + logger: logger, + syncCh: make(chan map[string][]*targetgroup.Group), + targets: make(map[poolKey]map[string]*targetgroup.Group), + discoverCancel: []context.CancelFunc{}, + ctx: ctx, + updatert: 5 * time.Second, + triggerSend: make(chan struct{}, 1), + } + for _, option := range options { + option(mgr) + } + return mgr +} + +// Name sets the name of the manager. +func Name(n string) func(*Manager) { + return func(m *Manager) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.name = n + } +} + +// Manager maintains a set of discovery providers and sends each update to a map channel. +// Targets are grouped by the target set name. +type Manager struct { + logger log.Logger + name string + mtx sync.RWMutex + ctx context.Context + discoverCancel []context.CancelFunc + + // Some Discoverers(eg. k8s) send only the updates for a given target group + // so we use map[tg.Source]*targetgroup.Group to know which group to update. + targets map[poolKey]map[string]*targetgroup.Group + // providers keeps track of SD providers. + providers []*provider + // The sync channel sends the updates as a map where the key is the job value from the scrape config. + syncCh chan map[string][]*targetgroup.Group + + // How long to wait before sending updates to the channel. The variable + // should only be modified in unit tests. + updatert time.Duration + + // The triggerSend channel signals to the manager that new updates have been received from providers. + triggerSend chan struct{} +} + +// Run starts the background processing +func (m *Manager) Run() error { + go m.sender() + for range m.ctx.Done() { + m.cancelDiscoverers() + return m.ctx.Err() + } + return nil +} + +// SyncCh returns a read only channel used by all the clients to receive target updates. +func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { + return m.syncCh +} + +// ApplyConfig removes all running discovery providers and starts new ones using the provided config. +func (m *Manager) ApplyConfig(cfg map[string]discovery.Configs) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + for pk := range m.targets { + if _, ok := cfg[pk.setName]; !ok { + discoveredTargets.DeleteLabelValues(m.name, pk.setName) + } + } + m.cancelDiscoverers() + m.targets = make(map[poolKey]map[string]*targetgroup.Group) + m.providers = nil + m.discoverCancel = nil + + failedCount := 0 + for name, scfg := range cfg { + failedCount += m.registerProviders(scfg, name) + discoveredTargets.WithLabelValues(m.name, name).Set(0) + } + failedConfigs.WithLabelValues(m.name).Set(float64(failedCount)) + + for _, prov := range m.providers { + m.startProvider(m.ctx, prov) + } + + return nil +} + +// StartCustomProvider is used for sdtool. Only use this if you know what you're doing. +func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker discovery.Discoverer) { + p := &provider{ + name: name, + d: worker, + subs: []string{name}, + } + m.providers = append(m.providers, p) + m.startProvider(ctx, p) +} + +func (m *Manager) startProvider(ctx context.Context, p *provider) { + level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs)) + ctx, cancel := context.WithCancel(ctx) + updates := make(chan []*targetgroup.Group) + + m.discoverCancel = append(m.discoverCancel, cancel) + + go p.d.Run(ctx, updates) + go m.updater(ctx, p, updates) +} + +func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) { + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + receivedUpdates.WithLabelValues(m.name).Inc() + if !ok { + level.Debug(m.logger).Log("msg", "Discoverer channel closed", "provider", p.name) + return + } + + for _, s := range p.subs { + m.updateGroup(poolKey{setName: s, provider: p.name}, tgs) + } + + select { + case m.triggerSend <- struct{}{}: + default: + } + } + } +} + +func (m *Manager) sender() { + ticker := time.NewTicker(m.updatert) + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. + select { + case <-m.triggerSend: + sentUpdates.WithLabelValues(m.name).Inc() + select { + case m.syncCh <- m.allGroups(): + default: + delayedUpdates.WithLabelValues(m.name).Inc() + level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") + select { + case m.triggerSend <- struct{}{}: + default: + } + } + default: + } + } + } +} + +func (m *Manager) cancelDiscoverers() { + for _, c := range m.discoverCancel { + c() + } +} + +func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) + } + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. + m.targets[poolKey][tg.Source] = tg + } + } +} + +func (m *Manager) allGroups() map[string][]*targetgroup.Group { + m.mtx.RLock() + defer m.mtx.RUnlock() + + tSets := map[string][]*targetgroup.Group{} + n := map[string]int{} + for pkey, tsets := range m.targets { + for _, tg := range tsets { + // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' + // to signal that it needs to stop all scrape loops for this target set. + tSets[pkey.setName] = append(tSets[pkey.setName], tg) + n[pkey.setName] += len(tg.Targets) + } + } + for setName, v := range n { + discoveredTargets.WithLabelValues(m.name, setName).Set(float64(v)) + } + return tSets +} + +// registerProviders returns a number of failed SD config. +func (m *Manager) registerProviders(cfgs discovery.Configs, setName string) int { + var ( + failed int + added bool + ) + add := func(cfg discovery.Config) { + for _, p := range m.providers { + if reflect.DeepEqual(cfg, p.config) { + p.subs = append(p.subs, setName) + added = true + return + } + } + typ := cfg.Name() + d, err := cfg.NewDiscoverer(discovery.DiscovererOptions{ + Logger: log.With(m.logger, "discovery", typ), + }) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", typ) + failed++ + return + } + m.providers = append(m.providers, &provider{ + name: fmt.Sprintf("%s/%d", typ, len(m.providers)), + d: d, + config: cfg, + subs: []string{setName}, + }) + added = true + } + for _, cfg := range cfgs { + add(cfg) + } + if !added { + // Add an empty target group to force the refresh of the corresponding + // scrape pool and to notify the receiver that this target set has no + // current targets. + // It can happen because the combined set of SD configurations is empty + // or because we fail to instantiate all the SD configurations. + add(discovery.StaticConfig{{}}) + } + return failed +} + +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*targetgroup.Group +} + +// Run implements the Worker interface. +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): + } + close(ch) +} diff --git a/discovery/legacymanager/manager_test.go b/discovery/legacymanager/manager_test.go new file mode 100644 index 0000000000..a977fcd292 --- /dev/null +++ b/discovery/legacymanager/manager_test.go @@ -0,0 +1,1143 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package legacymanager + +import ( + "context" + "fmt" + "sort" + "strconv" + "testing" + "time" + + "github.com/go-kit/log" + client_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestMain(m *testing.M) { + testutil.TolerantVerifyLeak(m) +} + +// TestTargetUpdatesOrder checks that the target updates are received in the expected order. +func TestTargetUpdatesOrder(t *testing.T) { + + // The order by which the updates are send is determined by the interval passed to the mock discovery adapter + // Final targets array is ordered alphabetically by the name of the discoverer. + // For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge. + testCases := []struct { + title string + updates map[string][]update + expectedTargets [][]*targetgroup.Group + }{ + { + title: "Single TP no updates", + updates: map[string][]update{ + "tp1": {}, + }, + expectedTargets: nil, + }, + { + title: "Multiple TPs no updates", + updates: map[string][]update{ + "tp1": {}, + "tp2": {}, + "tp3": {}, + }, + expectedTargets: nil, + }, + { + title: "Single TP empty initials", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{}, + interval: 5 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + {}, + }, + }, + { + title: "Multiple TPs empty initials", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{}, + interval: 5 * time.Millisecond, + }, + }, + "tp2": { + { + targetGroups: []targetgroup.Group{}, + interval: 200 * time.Millisecond, + }, + }, + "tp3": { + { + targetGroups: []targetgroup.Group{}, + interval: 100 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + {}, + {}, + {}, + }, + }, + { + title: "Single TP initials only", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }}, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + { + title: "Multiple TPs initials only", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + "tp2": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, + interval: 10 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, + }, + }, + { + title: "Single TP initials followed by empty updates", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 0, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + interval: 10 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + }, + }, + { + title: "Single TP initials and new groups", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 0, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + interval: 10 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + }, + { + title: "Multiple TPs initials and new groups", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 10 * time.Millisecond, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group4", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 500 * time.Millisecond, + }, + }, + "tp2": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + interval: 100 * time.Millisecond, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp2_group3", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2_group4", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + interval: 10 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + { + Source: "tp2_group3", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2_group4", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp1_group3", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group4", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + { + Source: "tp2_group3", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2_group4", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + }, + }, + { + title: "One TP initials arrive after other TP updates.", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 10 * time.Millisecond, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 150 * time.Millisecond, + }, + }, + "tp2": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + interval: 200 * time.Millisecond, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + interval: 100 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2_group1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2_group2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + }, + }, + + { + title: "Single TP empty update in between", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 30 * time.Millisecond, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + interval: 10 * time.Millisecond, + }, + { + targetGroups: []targetgroup.Group{ + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 300 * time.Millisecond, + }, + }, + }, + expectedTargets: [][]*targetgroup.Group{ + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{}, + }, + }, + { + { + Source: "tp1_group1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1_group2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + }, + }, + } + + for i, tc := range testCases { + tc := tc + t.Run(tc.title, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + + var totalUpdatesCount int + provUpdates := make(chan []*targetgroup.Group) + for _, up := range tc.updates { + go newMockDiscoveryProvider(up...).Run(ctx, provUpdates) + if len(up) > 0 { + totalUpdatesCount += len(up) + } + } + + for x := 0; x < totalUpdatesCount; x++ { + select { + case <-ctx.Done(): + t.Fatalf("%d: no update arrived within the timeout limit", x) + case tgs := <-provUpdates: + discoveryManager.updateGroup(poolKey{setName: strconv.Itoa(i), provider: tc.title}, tgs) + for _, got := range discoveryManager.allGroups() { + assertEqualGroups(t, got, tc.expectedTargets[x], func(got, expected string) string { + return fmt.Sprintf("%d: \ntargets mismatch \ngot: %v \nexpected: %v", + x, + got, + expected) + }) + } + } + } + }) + } +} + +func assertEqualGroups(t *testing.T, got, expected []*targetgroup.Group, msg func(got, expected string) string) { + t.Helper() + + // Need to sort by the groups's source as the received order is not guaranteed. + sort.Sort(byGroupSource(got)) + sort.Sort(byGroupSource(expected)) + + require.Equal(t, expected, got) +} + +func staticConfig(addrs ...string) discovery.StaticConfig { + var cfg discovery.StaticConfig + for i, addr := range addrs { + cfg = append(cfg, &targetgroup.Group{ + Source: fmt.Sprint(i), + Targets: []model.LabelSet{ + {model.AddressLabel: model.LabelValue(addr)}, + }, + }) + } + return cfg +} + +func verifyPresence(t *testing.T, tSets map[poolKey]map[string]*targetgroup.Group, poolKey poolKey, label string, present bool) { + t.Helper() + if _, ok := tSets[poolKey]; !ok { + t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) + return + } + + match := false + var mergedTargets string + for _, targetGroup := range tSets[poolKey] { + + for _, l := range targetGroup.Targets { + mergedTargets = mergedTargets + " " + l.String() + if l.String() == label { + match = true + } + } + + } + if match != present { + msg := "" + if !present { + msg = "not" + } + t.Fatalf("%q should %s be present in Targets labels: %q", label, msg, mergedTargets) + } +} + +func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]discovery.Configs{ + "prometheus": { + staticConfig("foo:9090", "bar:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) + + c["prometheus"] = discovery.Configs{ + staticConfig("foo:9090"), + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) +} + +func TestDiscovererConfigs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]discovery.Configs{ + "prometheus": { + staticConfig("foo:9090", "bar:9090"), + staticConfig("baz:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/1"}, "{__address__=\"baz:9090\"}", true) +} + +// TestTargetSetRecreatesEmptyStaticConfigs ensures that reloading a config file after +// removing all targets from the static_configs sends an update with empty targetGroups. +// This is required to signal the receiver that this target set has no current targets. +func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]discovery.Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + + c["prometheus"] = discovery.Configs{ + discovery.StaticConfig{{}}, + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + + pkey := poolKey{setName: "prometheus", provider: "static/0"} + targetGroups, ok := discoveryManager.targets[pkey] + if !ok { + t.Fatalf("'%v' should be present in target groups", pkey) + } + group, ok := targetGroups[""] + if !ok { + t.Fatalf("missing '' key in target groups %v", targetGroups) + } + + if len(group.Targets) != 0 { + t.Fatalf("Invalid number of targets: expected 0, got %d", len(group.Targets)) + } +} + +func TestIdenticalConfigurationsAreCoalesced(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, nil) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]discovery.Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + "prometheus2": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + <-discoveryManager.SyncCh() + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(t, discoveryManager.targets, poolKey{setName: "prometheus2", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + if len(discoveryManager.providers) != 1 { + t.Fatalf("Invalid number of providers: expected 1, got %d", len(discoveryManager.providers)) + } +} + +func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) { + originalConfig := discovery.Configs{ + staticConfig("foo:9090", "bar:9090", "baz:9090"), + } + processedConfig := discovery.Configs{ + staticConfig("foo:9090", "bar:9090", "baz:9090"), + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + cfgs := map[string]discovery.Configs{ + "prometheus": processedConfig, + } + discoveryManager.ApplyConfig(cfgs) + <-discoveryManager.SyncCh() + + for _, cfg := range cfgs { + require.Equal(t, originalConfig, cfg) + } +} + +type errorConfig struct{ err error } + +func (e errorConfig) Name() string { return "error" } +func (e errorConfig) NewDiscoverer(discovery.DiscovererOptions) (discovery.Discoverer, error) { + return nil, e.err +} + +func TestGaugeFailedConfigs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + discoveryManager.updatert = 100 * time.Millisecond + go discoveryManager.Run() + + c := map[string]discovery.Configs{ + "prometheus": { + errorConfig{fmt.Errorf("tests error 0")}, + errorConfig{fmt.Errorf("tests error 1")}, + errorConfig{fmt.Errorf("tests error 2")}, + }, + } + discoveryManager.ApplyConfig(c) + <-discoveryManager.SyncCh() + + failedCount := client_testutil.ToFloat64(failedConfigs) + if failedCount != 3 { + t.Fatalf("Expected to have 3 failed configs, got: %v", failedCount) + } + + c["prometheus"] = discovery.Configs{ + staticConfig("foo:9090"), + } + discoveryManager.ApplyConfig(c) + <-discoveryManager.SyncCh() + + failedCount = client_testutil.ToFloat64(failedConfigs) + if failedCount != 0 { + t.Fatalf("Expected to get no failed config, got: %v", failedCount) + } + +} + +func TestCoordinationWithReceiver(t *testing.T) { + updateDelay := 100 * time.Millisecond + + type expect struct { + delay time.Duration + tgs map[string][]*targetgroup.Group + } + + testCases := []struct { + title string + providers map[string]discovery.Discoverer + expected []expect + }{ + { + title: "Receiver should get all updates even when one provider closes its channel", + providers: map[string]discovery.Discoverer{ + "once1": &onceProvider{ + tgs: []*targetgroup.Group{ + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + "mock1": newMockDiscoveryProvider( + update{ + interval: 2 * updateDelay, + targetGroups: []targetgroup.Group{ + { + Source: "tg2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + ), + }, + expected: []expect{ + { + tgs: map[string][]*targetgroup.Group{ + "once1": { + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + }, + { + tgs: map[string][]*targetgroup.Group{ + "once1": { + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + "mock1": { + { + Source: "tg2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + }, + }, + { + title: "Receiver should get all updates even when the channel is blocked", + providers: map[string]discovery.Discoverer{ + "mock1": newMockDiscoveryProvider( + update{ + targetGroups: []targetgroup.Group{ + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + update{ + interval: 4 * updateDelay, + targetGroups: []targetgroup.Group{ + { + Source: "tg2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + ), + }, + expected: []expect{ + { + delay: 2 * updateDelay, + tgs: map[string][]*targetgroup.Group{ + "mock1": { + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + }, + }, + }, + { + delay: 4 * updateDelay, + tgs: map[string][]*targetgroup.Group{ + "mock1": { + { + Source: "tg1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tg2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.title, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + mgr := NewManager(ctx, nil) + mgr.updatert = updateDelay + go mgr.Run() + + for name, p := range tc.providers { + mgr.StartCustomProvider(ctx, name, p) + } + + for i, expected := range tc.expected { + time.Sleep(expected.delay) + select { + case <-ctx.Done(): + t.Fatalf("step %d: no update received in the expected timeframe", i) + case tgs, ok := <-mgr.SyncCh(): + if !ok { + t.Fatalf("step %d: discovery manager channel is closed", i) + } + if len(tgs) != len(expected.tgs) { + t.Fatalf("step %d: target groups mismatch, got: %d, expected: %d\ngot: %#v\nexpected: %#v", + i, len(tgs), len(expected.tgs), tgs, expected.tgs) + } + for k := range expected.tgs { + if _, ok := tgs[k]; !ok { + t.Fatalf("step %d: target group not found: %s\ngot: %#v", i, k, tgs) + } + assertEqualGroups(t, tgs[k], expected.tgs[k], func(got, expected string) string { + return fmt.Sprintf("step %d: targets mismatch \ngot: %q \nexpected: %q", i, got, expected) + }) + } + } + } + }) + } +} + +type update struct { + targetGroups []targetgroup.Group + interval time.Duration +} + +type mockdiscoveryProvider struct { + updates []update +} + +func newMockDiscoveryProvider(updates ...update) mockdiscoveryProvider { + tp := mockdiscoveryProvider{ + updates: updates, + } + return tp +} + +func (tp mockdiscoveryProvider) Run(ctx context.Context, upCh chan<- []*targetgroup.Group) { + for _, u := range tp.updates { + if u.interval > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(u.interval): + } + } + tgs := make([]*targetgroup.Group, len(u.targetGroups)) + for i := range u.targetGroups { + tgs[i] = &u.targetGroups[i] + } + upCh <- tgs + } + <-ctx.Done() +} + +// byGroupSource implements sort.Interface so we can sort by the Source field. +type byGroupSource []*targetgroup.Group + +func (a byGroupSource) Len() int { return len(a) } +func (a byGroupSource) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byGroupSource) Less(i, j int) bool { return a[i].Source < a[j].Source } + +// onceProvider sends updates once (if any) and closes the update channel. +type onceProvider struct { + tgs []*targetgroup.Group +} + +func (o onceProvider) Run(_ context.Context, ch chan<- []*targetgroup.Group) { + if len(o.tgs) > 0 { + ch <- o.tgs + } + close(ch) +} diff --git a/discovery/legacymanager/registry.go b/discovery/legacymanager/registry.go new file mode 100644 index 0000000000..fb01e16488 --- /dev/null +++ b/discovery/legacymanager/registry.go @@ -0,0 +1,259 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package legacymanager + +import ( + "fmt" + "reflect" + "sort" + "strconv" + "strings" + "sync" + + "gopkg.in/yaml.v2" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +const ( + configFieldPrefix = "AUTO_DISCOVERY_" + staticConfigsKey = "static_configs" + staticConfigsFieldName = configFieldPrefix + staticConfigsKey +) + +var ( + configNames = make(map[string]discovery.Config) + configFieldNames = make(map[reflect.Type]string) + configFields []reflect.StructField + + configTypesMu sync.Mutex + configTypes = make(map[reflect.Type]reflect.Type) + + emptyStructType = reflect.TypeOf(struct{}{}) + configsType = reflect.TypeOf(discovery.Configs{}) +) + +// RegisterConfig registers the given Config type for YAML marshaling and unmarshaling. +func RegisterConfig(config discovery.Config) { + registerConfig(config.Name()+"_sd_configs", reflect.TypeOf(config), config) +} + +func init() { + // N.B.: static_configs is the only Config type implemented by default. + // All other types are registered at init by their implementing packages. + elemTyp := reflect.TypeOf(&targetgroup.Group{}) + registerConfig(staticConfigsKey, elemTyp, discovery.StaticConfig{}) +} + +func registerConfig(yamlKey string, elemType reflect.Type, config discovery.Config) { + name := config.Name() + if _, ok := configNames[name]; ok { + panic(fmt.Sprintf("discovery: Config named %q is already registered", name)) + } + configNames[name] = config + + fieldName := configFieldPrefix + yamlKey // Field must be exported. + configFieldNames[elemType] = fieldName + + // Insert fields in sorted order. + i := sort.Search(len(configFields), func(k int) bool { + return fieldName < configFields[k].Name + }) + configFields = append(configFields, reflect.StructField{}) // Add empty field at end. + copy(configFields[i+1:], configFields[i:]) // Shift fields to the right. + configFields[i] = reflect.StructField{ // Write new field in place. + Name: fieldName, + Type: reflect.SliceOf(elemType), + Tag: reflect.StructTag(`yaml:"` + yamlKey + `,omitempty"`), + } +} + +func getConfigType(out reflect.Type) reflect.Type { + configTypesMu.Lock() + defer configTypesMu.Unlock() + if typ, ok := configTypes[out]; ok { + return typ + } + // Initial exported fields map one-to-one. + var fields []reflect.StructField + for i, n := 0, out.NumField(); i < n; i++ { + switch field := out.Field(i); { + case field.PkgPath == "" && field.Type != configsType: + fields = append(fields, field) + default: + fields = append(fields, reflect.StructField{ + Name: "_" + field.Name, // Field must be unexported. + PkgPath: out.PkgPath(), + Type: emptyStructType, + }) + } + } + // Append extra config fields on the end. + fields = append(fields, configFields...) + typ := reflect.StructOf(fields) + configTypes[out] = typ + return typ +} + +// UnmarshalYAMLWithInlineConfigs helps implement yaml.Unmarshal for structs +// that have a Configs field that should be inlined. +func UnmarshalYAMLWithInlineConfigs(out interface{}, unmarshal func(interface{}) error) error { + outVal := reflect.ValueOf(out) + if outVal.Kind() != reflect.Ptr { + return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out) + } + outVal = outVal.Elem() + if outVal.Kind() != reflect.Struct { + return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out) + } + outTyp := outVal.Type() + + cfgTyp := getConfigType(outTyp) + cfgPtr := reflect.New(cfgTyp) + cfgVal := cfgPtr.Elem() + + // Copy shared fields (defaults) to dynamic value. + var configs *discovery.Configs + for i, n := 0, outVal.NumField(); i < n; i++ { + if outTyp.Field(i).Type == configsType { + configs = outVal.Field(i).Addr().Interface().(*discovery.Configs) + continue + } + if cfgTyp.Field(i).PkgPath != "" { + continue // Field is unexported: ignore. + } + cfgVal.Field(i).Set(outVal.Field(i)) + } + if configs == nil { + return fmt.Errorf("discovery: Configs field not found in type: %T", out) + } + + // Unmarshal into dynamic value. + if err := unmarshal(cfgPtr.Interface()); err != nil { + return replaceYAMLTypeError(err, cfgTyp, outTyp) + } + + // Copy shared fields from dynamic value. + for i, n := 0, outVal.NumField(); i < n; i++ { + if cfgTyp.Field(i).PkgPath != "" { + continue // Field is unexported: ignore. + } + outVal.Field(i).Set(cfgVal.Field(i)) + } + + var err error + *configs, err = readConfigs(cfgVal, outVal.NumField()) + return err +} + +func readConfigs(structVal reflect.Value, startField int) (discovery.Configs, error) { + var ( + configs discovery.Configs + targets []*targetgroup.Group + ) + for i, n := startField, structVal.NumField(); i < n; i++ { + field := structVal.Field(i) + if field.Kind() != reflect.Slice { + panic("discovery: internal error: field is not a slice") + } + for k := 0; k < field.Len(); k++ { + val := field.Index(k) + if val.IsZero() || (val.Kind() == reflect.Ptr && val.Elem().IsZero()) { + key := configFieldNames[field.Type().Elem()] + key = strings.TrimPrefix(key, configFieldPrefix) + return nil, fmt.Errorf("empty or null section in %s", key) + } + switch c := val.Interface().(type) { + case *targetgroup.Group: + // Add index to the static config target groups for unique identification + // within scrape pool. + c.Source = strconv.Itoa(len(targets)) + // Coalesce multiple static configs into a single static config. + targets = append(targets, c) + case discovery.Config: + configs = append(configs, c) + default: + panic("discovery: internal error: slice element is not a Config") + } + } + } + if len(targets) > 0 { + configs = append(configs, discovery.StaticConfig(targets)) + } + return configs, nil +} + +// MarshalYAMLWithInlineConfigs helps implement yaml.Marshal for structs +// that have a Configs field that should be inlined. +func MarshalYAMLWithInlineConfigs(in interface{}) (interface{}, error) { + inVal := reflect.ValueOf(in) + for inVal.Kind() == reflect.Ptr { + inVal = inVal.Elem() + } + inTyp := inVal.Type() + + cfgTyp := getConfigType(inTyp) + cfgPtr := reflect.New(cfgTyp) + cfgVal := cfgPtr.Elem() + + // Copy shared fields to dynamic value. + var configs *discovery.Configs + for i, n := 0, inTyp.NumField(); i < n; i++ { + if inTyp.Field(i).Type == configsType { + configs = inVal.Field(i).Addr().Interface().(*discovery.Configs) + } + if cfgTyp.Field(i).PkgPath != "" { + continue // Field is unexported: ignore. + } + cfgVal.Field(i).Set(inVal.Field(i)) + } + if configs == nil { + return nil, fmt.Errorf("discovery: Configs field not found in type: %T", in) + } + + if err := writeConfigs(cfgVal, *configs); err != nil { + return nil, err + } + + return cfgPtr.Interface(), nil +} + +func writeConfigs(structVal reflect.Value, configs discovery.Configs) error { + targets := structVal.FieldByName(staticConfigsFieldName).Addr().Interface().(*[]*targetgroup.Group) + for _, c := range configs { + if sc, ok := c.(discovery.StaticConfig); ok { + *targets = append(*targets, sc...) + continue + } + fieldName, ok := configFieldNames[reflect.TypeOf(c)] + if !ok { + return fmt.Errorf("discovery: cannot marshal unregistered Config type: %T", c) + } + field := structVal.FieldByName(fieldName) + field.Set(reflect.Append(field, reflect.ValueOf(c))) + } + return nil +} + +func replaceYAMLTypeError(err error, oldTyp, newTyp reflect.Type) error { + if e, ok := err.(*yaml.TypeError); ok { + oldStr := oldTyp.String() + newStr := newTyp.String() + for i, s := range e.Errors { + e.Errors[i] = strings.Replace(s, oldStr, newStr, -1) + } + } + return err +} diff --git a/discovery/manager.go b/discovery/manager.go index b3dae5c59f..639a241804 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -65,7 +65,7 @@ var ( ) ) -func init() { +func RegisterMetrics() { prometheus.MustRegister(failedConfigs, discoveredTargets, receivedUpdates, delayedUpdates, sentUpdates) } diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 07424d7318..8a8c9f70d4 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -1,9 +1,9 @@ --- -title: Feature Flags +title: Feature flags sort_rank: 11 --- -# Feature Flags +# Feature flags Here is a list of features that are disabled by default since they are breaking changes or are considered experimental. Their behaviour can change in future releases which will be communicated via the [release changelog](https://github.com/prometheus/prometheus/blob/main/CHANGELOG.md). @@ -46,7 +46,7 @@ More details can be found [here](querying/basics.md#offset-modifier). The remote write receiver allows Prometheus to accept remote write requests from other Prometheus servers. More details can be found [here](storage.md#overview). -## Exemplars Storage +## Exemplars storage `--enable-feature=exemplar-storage` @@ -54,7 +54,7 @@ The remote write receiver allows Prometheus to accept remote write requests from Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=` uses roughly 100 bytes of memory via the in-memory exemplar storage. If the exemplar storage is enabled, we will also append the exemplars to WAL for local persistence (for WAL duration). -## Memory Snapshot on Shutdown +## Memory snapshot on shutdown `--enable-feature=memory-snapshot-on-shutdown` @@ -62,7 +62,7 @@ This takes the snapshot of the chunks that are in memory along with the series i it on disk. This will reduce the startup time since the memory state can be restored with this snapshot and m-mapped chunks without the need of WAL replay. -## Extra Scrape Metrics +## Extra scrape metrics `--enable-feature=extra-scrape-metrics` @@ -71,3 +71,17 @@ When enabled, for each instance scrape, Prometheus stores a sample in the follow - `scrape_timeout_seconds`. The configured `scrape_timeout` for a target. This allows you to measure each target to find out how close they are to timing out with `scrape_duration_seconds / scrape_timeout_seconds`. - `scrape_sample_limit`. The configured `sample_limit` for a target. This allows you to measure each target to find out how close they are to reaching the limit with `scrape_samples_post_metric_relabeling / scrape_sample_limit`. Note that `scrape_sample_limit` can be zero if there is no limit configured, which means that the query above can return `+Inf` for targets with no limit (as we divide by zero). If you want to query only for targets that do have a sample limit use this query: `scrape_samples_post_metric_relabeling / (scrape_sample_limit > 0)`. + +## New service discovery manager + +`--enable-feature=new-service-discovery-manager` + +When enabled, Prometheus uses a new service discovery manager that does not +restart unchanged discoveries upon reloading. This makes reloads faster and reduces +pressure on service discoveries' sources. + +Users are encouraged to test the new service discovery manager and report any +issues upstream. + +In future releases, this new service discovery manager will become the default and +this feature flag will be ignored.