diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index b03db5891f..ad580e9979 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -43,6 +43,7 @@ import ( "github.com/prometheus/common/promlog" promlogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" @@ -230,27 +231,29 @@ func main() { cfg.queryEngine.Logger = log.With(logger, "component", "query engine") var ( - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager")) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ctx, cancelCtx = context.WithCancel(context.Background()) + ctxWeb, cancelWeb = context.WithCancel(context.Background()) + ctxRule = context.Background() + + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager")) + scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ruleManager = rules.NewManager(&rules.ManagerOptions{ + Appendable: fanoutStorage, + QueryFunc: rules.EngineQueryFunc(queryEngine), + NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), + Context: ctxRule, + ExternalURL: cfg.web.ExternalURL, + Registerer: prometheus.DefaultRegisterer, + Logger: log.With(logger, "component", "rule manager"), + }) ) - ruleManager := rules.NewManager(&rules.ManagerOptions{ - Appendable: fanoutStorage, - QueryFunc: rules.EngineQueryFunc(queryEngine), - NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), - Context: ctx, - ExternalURL: cfg.web.ExternalURL, - Registerer: prometheus.DefaultRegisterer, - Logger: log.With(logger, "component", "rule manager"), - }) - - cfg.web.Context = ctx + cfg.web.Context = ctxWeb cfg.web.TSDB = localStorage.Get cfg.web.Storage = fanoutStorage cfg.web.QueryEngine = queryEngine - cfg.web.TargetManager = targetManager + cfg.web.ScrapeManager = scrapeManager cfg.web.RuleManager = ruleManager cfg.web.Notifier = notifier @@ -268,6 +271,7 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } + // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) // Monitor outgoing connections on default transport with conntrack. @@ -277,9 +281,10 @@ func main() { reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, - targetManager.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, + discoveryManager.ApplyConfig, + scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. var files []string @@ -326,6 +331,35 @@ func main() { }, ) } + { + ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background()) + g.Add( + func() error { + err := discoveryManager.Run(ctxDiscovery) + level.Info(logger).Log("msg", "Discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping discovery manager...") + cancelDiscovery() + }, + ) + } + { + g.Add( + func() error { + err := scrapeManager.Run(discoveryManager.SyncCh()) + level.Info(logger).Log("msg", "Scrape manager stopped") + return err + }, + func(err error) { + // Scrape manager needs to be stopped before closing the local TSDB + // so that it doesn't try to write samples to a closed storage. + level.Info(logger).Log("msg", "Stopping scrape manager...") + scrapeManager.Stop() + }, + ) + } { // Make sure that sighup handler is registered with a redirect to the channel before the potentially // long and synchronous tsdb init. @@ -426,7 +460,7 @@ func main() { { g.Add( func() error { - if err := webHandler.Run(ctx); err != nil { + if err := webHandler.Run(ctxWeb); err != nil { return fmt.Errorf("Error starting web server: %s", err) } return nil @@ -435,7 +469,7 @@ func main() { // Keep this interrupt before the ruleManager.Stop(). // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - cancelCtx() + cancelWeb() }, ) } @@ -467,21 +501,6 @@ func main() { }, ) } - { - // TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel. - cancel := make(chan struct{}) - g.Add( - func() error { - targetManager.Run() - <-cancel - return nil - }, - func(err error) { - targetManager.Stop() - close(cancel) - }, - ) - } if err := g.Run(); err != nil { level.Error(logger).Log("err", err) } diff --git a/config/config.go b/config/config.go index ab4208008d..73a938bca3 100644 --- a/config/config.go +++ b/config/config.go @@ -721,7 +721,7 @@ func (a *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error { return checkOverflow(a.XXX, "basic_auth") } -// TargetGroup is a set of targets with a common label set. +// TargetGroup is a set of targets with a common label set(production , test, staging etc.). type TargetGroup struct { // Targets is a list of targets identified by a label set. Each target is // uniquely identifiable in the group by its address label. diff --git a/discovery/README.md b/discovery/README.md index a2899a35db..a4bdde9f34 100644 --- a/discovery/README.md +++ b/discovery/README.md @@ -133,9 +133,9 @@ the Prometheus server will be able to see them. A Service Discovery (SD) mechanism has to discover targets and provide them to Prometheus. We expect similar targets to be grouped together, in the form of a [`TargetGroup`](https://godoc.org/github.com/prometheus/prometheus/config#TargetGroup). The SD mechanism sends the targets down to prometheus as list of `TargetGroups`. -An SD mechanism has to implement the `TargetProvider` Interface: +An SD mechanism has to implement the `Discoverer` Interface: ```go -type TargetProvider interface { +type Discoverer interface { Run(ctx context.Context, up chan<- []*config.TargetGroup) } ``` diff --git a/discovery/discovery.go b/discovery/discovery.go deleted file mode 100644 index 9cf5f8190f..0000000000 --- a/discovery/discovery.go +++ /dev/null @@ -1,319 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/azure" - "github.com/prometheus/prometheus/discovery/consul" - "github.com/prometheus/prometheus/discovery/dns" - "github.com/prometheus/prometheus/discovery/ec2" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/gce" - "github.com/prometheus/prometheus/discovery/kubernetes" - "github.com/prometheus/prometheus/discovery/marathon" - "github.com/prometheus/prometheus/discovery/openstack" - "github.com/prometheus/prometheus/discovery/triton" - "github.com/prometheus/prometheus/discovery/zookeeper" -) - -// A TargetProvider provides information about target groups. It maintains a set -// of sources from which TargetGroups can originate. Whenever a target provider -// detects a potential change, it sends the TargetGroup through its provided channel. -// -// The TargetProvider does not have to guarantee that an actual change happened. -// It does guarantee that it sends the new TargetGroup whenever a change happens. -// -// TargetProviders should initially send a full set of all discoverable TargetGroups. -type TargetProvider interface { - // Run hands a channel to the target provider through which it can send - // updated target groups. - // Must returns if the context gets canceled. It should not close the update - // channel on returning. - Run(ctx context.Context, up chan<- []*config.TargetGroup) -} - -// ProvidersFromConfig returns all TargetProviders configured in cfg. -func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider { - providers := map[string]TargetProvider{} - - app := func(mech string, i int, tp TargetProvider) { - providers[fmt.Sprintf("%s/%d", mech, i)] = tp - } - - for i, c := range cfg.DNSSDConfigs { - app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns"))) - } - for i, c := range cfg.FileSDConfigs { - app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file"))) - } - for i, c := range cfg.ConsulSDConfigs { - k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul")) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err) - continue - } - app("consul", i, k) - } - for i, c := range cfg.MarathonSDConfigs { - m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon")) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err) - continue - } - app("marathon", i, m) - } - for i, c := range cfg.KubernetesSDConfigs { - k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) - continue - } - app("kubernetes", i, k) - } - for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper"))) - } - for i, c := range cfg.NerveSDConfigs { - app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve"))) - } - for i, c := range cfg.EC2SDConfigs { - app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2"))) - } - for i, c := range cfg.OpenstackSDConfigs { - openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack")) - if err != nil { - level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) - continue - } - app("openstack", i, openstackd) - } - - for i, c := range cfg.GCESDConfigs { - gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce")) - if err != nil { - level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err) - continue - } - app("gce", i, gced) - } - for i, c := range cfg.AzureSDConfigs { - app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure"))) - } - for i, c := range cfg.TritonSDConfigs { - t, err := triton.New(log.With(logger, "discovery", "trition"), c) - if err != nil { - level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err) - continue - } - app("triton", i, t) - } - if len(cfg.StaticConfigs) > 0 { - app("static", 0, NewStaticProvider(cfg.StaticConfigs)) - } - - return providers -} - -// StaticProvider holds a list of target groups that never change. -type StaticProvider struct { - TargetGroups []*config.TargetGroup -} - -// NewStaticProvider returns a StaticProvider configured with the given -// target groups. -func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { - for i, tg := range groups { - tg.Source = fmt.Sprintf("%d", i) - } - return &StaticProvider{groups} -} - -// Run implements the TargetProvider interface. -func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - // We still have to consider that the consumer exits right away in which case - // the context will be canceled. - select { - case ch <- sd.TargetGroups: - case <-ctx.Done(): - } - close(ch) -} - -// TargetSet handles multiple TargetProviders and sends a full overview of their -// discovered TargetGroups to a Syncer. -type TargetSet struct { - mtx sync.RWMutex - // Sets of targets by a source string that is unique across target providers. - tgroups map[string]*config.TargetGroup - - syncer Syncer - - syncCh chan struct{} - providerCh chan map[string]TargetProvider - cancelProviders func() -} - -// Syncer receives updates complete sets of TargetGroups. -type Syncer interface { - Sync([]*config.TargetGroup) -} - -// NewTargetSet returns a new target sending TargetGroups to the Syncer. -func NewTargetSet(s Syncer) *TargetSet { - return &TargetSet{ - syncCh: make(chan struct{}, 1), - providerCh: make(chan map[string]TargetProvider), - syncer: s, - } -} - -// Run starts the processing of target providers and their updates. -// It blocks until the context gets canceled. -func (ts *TargetSet) Run(ctx context.Context) { -Loop: - for { - // Throttle syncing to once per five seconds. - select { - case <-ctx.Done(): - break Loop - case p := <-ts.providerCh: - ts.updateProviders(ctx, p) - case <-time.After(5 * time.Second): - } - - select { - case <-ctx.Done(): - break Loop - case <-ts.syncCh: - ts.sync() - case p := <-ts.providerCh: - ts.updateProviders(ctx, p) - } - } -} - -func (ts *TargetSet) sync() { - ts.mtx.RLock() - var all []*config.TargetGroup - for _, tg := range ts.tgroups { - all = append(all, tg) - } - ts.mtx.RUnlock() - - ts.syncer.Sync(all) -} - -// UpdateProviders sets new target providers for the target set. -func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) { - ts.providerCh <- p -} - -func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) { - - // Stop all previous target providers of the target set. - if ts.cancelProviders != nil { - ts.cancelProviders() - } - ctx, ts.cancelProviders = context.WithCancel(ctx) - - var wg sync.WaitGroup - // (Re-)create a fresh tgroups map to not keep stale targets around. We - // will retrieve all targets below anyway, so cleaning up everything is - // safe and doesn't inflict any additional cost. - ts.mtx.Lock() - ts.tgroups = map[string]*config.TargetGroup{} - ts.mtx.Unlock() - - for name, prov := range providers { - wg.Add(1) - - updates := make(chan []*config.TargetGroup) - go prov.Run(ctx, updates) - - go func(name string, prov TargetProvider) { - select { - case <-ctx.Done(): - case initial, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - break - } - // First set of all targets the provider knows. - for _, tgroup := range initial { - ts.setTargetGroup(name, tgroup) - } - case <-time.After(5 * time.Second): - // Initial set didn't arrive. Act as if it was empty - // and wait for updates later on. - } - wg.Done() - - // Start listening for further updates. - for { - select { - case <-ctx.Done(): - return - case tgs, ok := <-updates: - // Handle the case that a target provider exits and closes the channel - // before the context is done. - if !ok { - return - } - for _, tg := range tgs { - ts.update(name, tg) - } - } - } - }(name, prov) - } - - // We wait for a full initial set of target groups before releasing the mutex - // to ensure the initial sync is complete and there are no races with subsequent updates. - wg.Wait() - // Just signal that there are initial sets to sync now. Actual syncing must only - // happen in the runScraping loop. - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -// update handles a target group update from a target provider identified by the name. -func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) { - ts.setTargetGroup(name, tgroup) - - select { - case ts.syncCh <- struct{}{}: - default: - } -} - -func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) { - ts.mtx.Lock() - defer ts.mtx.Unlock() - - if tg == nil { - return - } - ts.tgroups[name+"/"+tg.Source] = tg -} diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go deleted file mode 100644 index 5c1fe6e653..0000000000 --- a/discovery/discovery_test.go +++ /dev/null @@ -1,1055 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discovery - -import ( - "context" - "reflect" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" - yaml "gopkg.in/yaml.v2" -) - -func TestTargetSetThrottlesTheSyncCalls(t *testing.T) { - testCases := []struct { - title string - updates map[string][]update - expectedSyncCalls [][]string - }{ - { - title: "Single TP no updates", - updates: map[string][]update{ - "tp1": {}, - }, - expectedSyncCalls: [][]string{ - {}, - }, - }, - { - title: "Multips TPs no updates", - updates: map[string][]update{ - "tp1": {}, - "tp2": {}, - "tp3": {}, - }, - expectedSyncCalls: [][]string{ - {}, - }, - }, - { - title: "Single TP empty initials", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{}, - interval: 5, - }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - }, - }, - { - title: "Multiple TPs empty initials", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{}, - interval: 5, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{}, - interval: 500, - }, - }, - "tp3": { - { - targetGroups: []config.TargetGroup{}, - interval: 100, - }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - }, - }, - { - title: "Multiple TPs empty initials with a delay", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{}, - interval: 6000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{}, - interval: 6500, - }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - }, - }, - { - title: "Single TP initials only", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 0, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, - }, - }, - { - title: "Multiple TPs initials only", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 0, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}}, - interval: 0, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp2-initial1"}, - }, - }, - { - title: "Single TP delayed initials only", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 6000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - {"initial1", "initial2"}, - }, - }, - { - title: "Multiple TPs with some delayed initials", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 100, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}, {Source: "tp2-initial3"}}, - interval: 6000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2"}, - {"tp1-initial1", "tp1-initial2", "tp2-initial1", "tp2-initial2", "tp2-initial3"}, - }, - }, - { - title: "Single TP initials followed by empty updates", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 0, - }, - { - targetGroups: []config.TargetGroup{}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, - }, - }, - { - title: "Single TP initials and new groups", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 0, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}, {Source: "update2"}}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, - {"initial1", "initial2", "update1", "update2"}, - }, - }, - { - title: "Multiple TPs initials and new groups", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}, {Source: "tp1-update2"}}, - interval: 1500, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update1"}, {Source: "tp2-update2"}}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp2-initial1", "tp2-initial2"}, - {"tp1-initial1", "tp1-initial2", "tp2-initial1", "tp2-initial2", "tp1-update1", "tp1-update2", "tp2-update1", "tp2-update2"}, - }, - }, - { - title: "One tp initials arrive after other tp updates but still within 5 seconds", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}, {Source: "tp1-update2"}}, - interval: 1500, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}}, - interval: 2000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update1"}, {Source: "tp2-update2"}}, - interval: 1000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2", "tp2-initial1", "tp2-initial2"}, - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2", "tp2-initial1", "tp2-initial2", "tp2-update1", "tp2-update2"}, - }, - }, - { - title: "One tp initials arrive after other tp updates and after 5 seconds", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}, {Source: "tp1-initial2"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}, {Source: "tp1-update2"}}, - interval: 1500, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update3"}}, - interval: 5000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}, {Source: "tp2-initial2"}}, - interval: 6000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2"}, - {"tp1-initial1", "tp1-initial2", "tp1-update1", "tp1-update2", "tp2-initial1", "tp2-initial2", "tp1-update3"}, - }, - }, - { - title: "Single TP initials and new groups after a delay", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 6000, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}, {Source: "update2"}}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {}, - {"initial1", "initial2", "update1", "update2"}, - }, - }, - { - title: "Single TP initial and successive updates", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{{Source: "update2"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update3"}}, - interval: 10, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1"}, - {"initial1", "update1", "update2", "update3"}, - }, - }, - { - title: "Multiple TPs initials and successive updates", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "tp1-initial1"}}, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update1"}}, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update2"}}, - interval: 2000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp1-update3"}}, - interval: 2000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{{Source: "tp2-initial1"}}, - interval: 3000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update1"}}, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update2"}}, - interval: 3000, - }, - { - targetGroups: []config.TargetGroup{{Source: "tp2-update3"}}, - interval: 2000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"tp1-initial1", "tp1-update1", "tp2-initial1"}, - {"tp1-initial1", "tp1-update1", "tp2-initial1", "tp1-update2", "tp1-update3", "tp2-update1", "tp2-update2"}, - {"tp1-initial1", "tp1-update1", "tp2-initial1", "tp1-update2", "tp1-update3", "tp2-update1", "tp2-update2", "tp2-update3"}, - }, - }, - { - title: "Single TP Multiple updates 5 second window", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}}, - interval: 25, - }, - { - targetGroups: []config.TargetGroup{{Source: "update2"}, {Source: "update3"}, {Source: "update4"}}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update5"}}, - interval: 0, - }, - { - targetGroups: []config.TargetGroup{{Source: "update6"}, {Source: "update7"}, {Source: "update8"}}, - interval: 70, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1"}, - {"initial1", "update1", "update2", "update3", "update4", "update5", "update6", "update7", "update8"}, - }, - }, - { - title: "Single TP Single provider empty update in between", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 30, - }, - { - targetGroups: []config.TargetGroup{{Source: "update1"}, {Source: "update2"}}, - interval: 300, - }, - { - targetGroups: []config.TargetGroup{}, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{{Source: "update3"}, {Source: "update4"}, {Source: "update5"}, {Source: "update6"}}, - interval: 6000, - }, - }, - }, - expectedSyncCalls: [][]string{ - {"initial1", "initial2"}, - {"initial1", "initial2", "update1", "update2"}, - {"initial1", "initial2", "update1", "update2", "update3", "update4", "update5", "update6"}, - }, - }, - } - - for i, testCase := range testCases { - finalize := make(chan bool) - - syncCallCount := 0 - syncedGroups := make([][]string, 0) - - targetSet := NewTargetSet(&mockSyncer{ - sync: func(tgs []*config.TargetGroup) { - - currentCallGroup := make([]string, len(tgs)) - for i, tg := range tgs { - currentCallGroup[i] = tg.Source - } - syncedGroups = append(syncedGroups, currentCallGroup) - - syncCallCount++ - if syncCallCount == len(testCase.expectedSyncCalls) { - // All the groups are sent, we can start asserting. - close(finalize) - } - }, - }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - targetProviders := map[string]TargetProvider{} - for tpName, tpUpdates := range testCase.updates { - tp := newMockTargetProvider(tpUpdates) - targetProviders[tpName] = tp - } - - go targetSet.Run(ctx) - targetSet.UpdateProviders(targetProviders) - - select { - case <-time.After(20 * time.Second): - t.Errorf("%d. %q: Test timed out after 20 seconds. All targets should be sent within the timeout", i, testCase.title) - - case <-finalize: - for name, tp := range targetProviders { - runCallCount := tp.(mockTargetProvider).callCount() - if runCallCount != 1 { - t.Errorf("%d. %q: TargetProvider Run should be called once for each target provider. For %q was called %d times", i, testCase.title, name, runCallCount) - } - } - - if len(syncedGroups) != len(testCase.expectedSyncCalls) { - t.Errorf("%d. %q: received sync calls: \n %v \n do not match expected calls: \n %v \n", i, testCase.title, syncedGroups, testCase.expectedSyncCalls) - } - - for j := range syncedGroups { - if len(syncedGroups[j]) != len(testCase.expectedSyncCalls[j]) { - t.Errorf("%d. %q: received sync calls in call [%v]: \n %v \n do not match expected calls: \n %v \n", i, testCase.title, j, syncedGroups[j], testCase.expectedSyncCalls[j]) - } - - expectedGroupsMap := make(map[string]struct{}) - for _, expectedGroup := range testCase.expectedSyncCalls[j] { - expectedGroupsMap[expectedGroup] = struct{}{} - } - - for _, syncedGroup := range syncedGroups[j] { - if _, ok := expectedGroupsMap[syncedGroup]; !ok { - t.Errorf("%d. %q: '%s' does not exist in expected target groups: %s", i, testCase.title, syncedGroup, testCase.expectedSyncCalls[j]) - } else { - delete(expectedGroupsMap, syncedGroup) // Remove used targets from the map. - } - } - } - } - } -} - -func TestTargetSetConsolidatesToTheLatestState(t *testing.T) { - testCases := []struct { - title string - updates map[string][]update - }{ - { - title: "Single TP update same initial group multiple times", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}, {"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 250, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 250, - }, - }, - }, - }, - { - title: "Multiple TPs update", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - }, - interval: 3, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.13:6003"}, - {"__instance__": "10.11.122.14:6003"}, - }, - }, - }, - interval: 10, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - }, - interval: 3, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 10, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.13:6003"}, - {"__instance__": "10.11.122.14:6003"}, - }, - }, - }, - interval: 10, - }, - }, - }, - }, - { - title: "Multiple TPs update II", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}}, - }, - { - Source: "tp1-initial2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.13:6003"}}, - }, - { - Source: "tp1-initial2", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.14:6003"}, - }, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - { - Source: "tp1-initial1", - Targets: []model.LabelSet{}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.16:6003"}, - {"__instance__": "10.11.122.17:6003"}, - {"__instance__": "10.11.122.18:6003"}, - }, - }, - }, - interval: 100, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{}, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.13:6003"}}, - }, - }, - interval: 100, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-update2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - { - Source: "tp2-update1", - Targets: []model.LabelSet{}, - }, - }, - interval: 300, - }, - }, - }, - }, - { - title: "Three rounds of sync call", - updates: map[string][]update{ - "tp1": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.11:6003"}, - {"__instance__": "10.11.122.12:6003"}, - {"__instance__": "10.11.122.13:6003"}, - }, - }, - { - Source: "tp1-initial2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.14:6003"}}, - }, - }, - interval: 1000, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.12:6003"}}, - }, - { - Source: "tp1-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - }, - interval: 3000, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp1-initial1", - Targets: []model.LabelSet{}, - }, - { - Source: "tp1-update1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.15:6003"}, - {"__instance__": "10.11.122.16:6003"}, - }, - }, - }, - interval: 3000, - }, - }, - "tp2": { - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{ - {"__instance__": "10.11.122.11:6003"}, - }, - }, - { - Source: "tp2-initial2", - Targets: []model.LabelSet{{"__instance__": "10.11.122.14:6003"}}, - }, - { - Source: "tp2-initial3", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - }, - interval: 6000, - }, - { - targetGroups: []config.TargetGroup{ - { - Source: "tp2-initial1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.11:6003"}, {"__instance__": "10.11.122.12:6003"}}, - }, - { - Source: "tp2-update1", - Targets: []model.LabelSet{{"__instance__": "10.11.122.15:6003"}}, - }, - }, - interval: 3000, - }, - }, - }, - }, - } - - // Function to determine if the sync call received the latest state of - // all the target groups that came out of the target provider. - endStateAchieved := func(groupsSentToSyc []*config.TargetGroup, endState map[string]config.TargetGroup) bool { - - if len(groupsSentToSyc) != len(endState) { - return false - } - - for _, tg := range groupsSentToSyc { - if _, ok := endState[tg.Source]; ok == false { - // The target group does not exist in the end state. - return false - } - - if reflect.DeepEqual(endState[tg.Source], *tg) == false { - // The target group has not reached its final state yet. - return false - } - - delete(endState, tg.Source) // Remove used target groups. - } - - return true - } - - for i, testCase := range testCases { - expectedGroups := make(map[string]config.TargetGroup) - for _, tpUpdates := range testCase.updates { - for _, update := range tpUpdates { - for _, targetGroup := range update.targetGroups { - expectedGroups[targetGroup.Source] = targetGroup - } - } - } - - finalize := make(chan bool) - - targetSet := NewTargetSet(&mockSyncer{ - sync: func(tgs []*config.TargetGroup) { - - endState := make(map[string]config.TargetGroup) - for k, v := range expectedGroups { - endState[k] = v - } - - if endStateAchieved(tgs, endState) == false { - return - } - - close(finalize) - }, - }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - targetProviders := map[string]TargetProvider{} - for tpName, tpUpdates := range testCase.updates { - tp := newMockTargetProvider(tpUpdates) - targetProviders[tpName] = tp - } - - go targetSet.Run(ctx) - targetSet.UpdateProviders(targetProviders) - - select { - case <-time.After(20 * time.Second): - t.Errorf("%d. %q: Test timed out after 20 seconds. All targets should be sent within the timeout", i, testCase.title) - - case <-finalize: - // System successfully reached to the end state. - } - } -} - -func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { - verifyPresence := func(tgroups map[string]*config.TargetGroup, name string, present bool) { - if _, ok := tgroups[name]; ok != present { - msg := "" - if !present { - msg = "not " - } - t.Fatalf("'%s' should %sbe present in TargetSet.tgroups: %s", name, msg, tgroups) - } - } - - cfg := &config.ServiceDiscoveryConfig{} - - sOne := ` -static_configs: -- targets: ["foo:9090"] -- targets: ["bar:9090"] -` - if err := yaml.Unmarshal([]byte(sOne), cfg); err != nil { - t.Fatalf("Unable to load YAML config sOne: %s", err) - } - called := make(chan struct{}) - - ts := NewTargetSet(&mockSyncer{ - sync: func([]*config.TargetGroup) { called <- struct{}{} }, - }) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go ts.Run(ctx) - - ts.UpdateProviders(ProvidersFromConfig(*cfg, nil)) - <-called - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", true) - - sTwo := ` -static_configs: -- targets: ["foo:9090"] -` - if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil { - t.Fatalf("Unable to load YAML config sTwo: %s", err) - } - - ts.UpdateProviders(ProvidersFromConfig(*cfg, nil)) - <-called - - verifyPresence(ts.tgroups, "static/0/0", true) - verifyPresence(ts.tgroups, "static/0/1", false) -} - -func TestTargetSetRunsSameTargetProviderMultipleTimes(t *testing.T) { - var wg sync.WaitGroup - - wg.Add(2) - - ts1 := NewTargetSet(&mockSyncer{ - sync: func([]*config.TargetGroup) { wg.Done() }, - }) - - ts2 := NewTargetSet(&mockSyncer{ - sync: func([]*config.TargetGroup) { wg.Done() }, - }) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - updates := []update{ - { - targetGroups: []config.TargetGroup{{Source: "initial1"}, {Source: "initial2"}}, - interval: 10, - }, - } - - tp := newMockTargetProvider(updates) - targetProviders := map[string]TargetProvider{} - targetProviders["testProvider"] = tp - - go ts1.Run(ctx) - go ts2.Run(ctx) - - ts1.UpdateProviders(targetProviders) - ts2.UpdateProviders(targetProviders) - - finalize := make(chan struct{}) - go func() { - defer close(finalize) - wg.Wait() - }() - - select { - case <-time.After(20 * time.Second): - t.Error("Test timed out after 20 seconds. All targets should be sent within the timeout") - - case <-finalize: - if tp.callCount() != 2 { - t.Errorf("Was expecting 2 calls, received %d", tp.callCount()) - } - } -} - -type mockSyncer struct { - sync func(tgs []*config.TargetGroup) -} - -func (s *mockSyncer) Sync(tgs []*config.TargetGroup) { - if s.sync != nil { - s.sync(tgs) - } -} - -type update struct { - targetGroups []config.TargetGroup - interval time.Duration -} - -type mockTargetProvider struct { - _callCount *int32 - updates []update - up chan<- []*config.TargetGroup -} - -func newMockTargetProvider(updates []update) mockTargetProvider { - var callCount int32 - - tp := mockTargetProvider{ - _callCount: &callCount, - updates: updates, - } - - return tp -} - -func (tp mockTargetProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) { - atomic.AddInt32(tp._callCount, 1) - tp.up = up - tp.sendUpdates() -} - -func (tp mockTargetProvider) sendUpdates() { - for _, update := range tp.updates { - - time.Sleep(update.interval * time.Millisecond) - - tgs := make([]*config.TargetGroup, len(update.targetGroups)) - for i := range update.targetGroups { - tgs[i] = &update.targetGroups[i] - } - - tp.up <- tgs - } -} - -func (tp mockTargetProvider) callCount() int { - return int(atomic.LoadInt32(tp._callCount)) -} diff --git a/discovery/manager.go b/discovery/manager.go new file mode 100644 index 0000000000..a76676a576 --- /dev/null +++ b/discovery/manager.go @@ -0,0 +1,301 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "sort" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/prometheus/config" + + "github.com/prometheus/prometheus/discovery/azure" + "github.com/prometheus/prometheus/discovery/consul" + "github.com/prometheus/prometheus/discovery/dns" + "github.com/prometheus/prometheus/discovery/ec2" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/gce" + "github.com/prometheus/prometheus/discovery/kubernetes" + "github.com/prometheus/prometheus/discovery/marathon" + "github.com/prometheus/prometheus/discovery/openstack" + "github.com/prometheus/prometheus/discovery/triton" + "github.com/prometheus/prometheus/discovery/zookeeper" +) + +// Discoverer provides information about target groups. It maintains a set +// of sources from which TargetGroups can originate. Whenever a discovery provider +// detects a potential change, it sends the TargetGroup through its channel. +// +// Discoverer does not know if an actual change happened. +// It does guarantee that it sends the new TargetGroup whenever a change happens. +// +// Discoverers should initially send a full set of all discoverable TargetGroups. +type Discoverer interface { + // Run hands a channel to the discovery provider(consul,dns etc) through which it can send + // updated target groups. + // Must returns if the context gets canceled. It should not close the update + // channel on returning. + Run(ctx context.Context, up chan<- []*config.TargetGroup) +} + +type poolKey struct { + setName string + provider string +} + +// byProvider implements sort.Interface for []poolKey based on the provider field. +// Sorting is needed so that we can have predictable tests. +type byProvider []poolKey + +func (a byProvider) Len() int { return len(a) } +func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider } + +// NewManager is the Discovery Manager constructor +func NewManager(logger log.Logger) *Manager { + return &Manager{ + logger: logger, + actionCh: make(chan func(context.Context)), + syncCh: make(chan map[string][]*config.TargetGroup), + targets: make(map[poolKey][]*config.TargetGroup), + discoverCancel: []context.CancelFunc{}, + } +} + +// Manager maintains a set of discovery providers and sends each update to a channel used by other packages. +type Manager struct { + logger log.Logger + actionCh chan func(context.Context) + discoverCancel []context.CancelFunc + targets map[poolKey][]*config.TargetGroup + // The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config. + syncCh chan map[string][]*config.TargetGroup +} + +// Run starts the background processing +func (m *Manager) Run(ctx context.Context) error { + for { + select { + case f := <-m.actionCh: + f(ctx) + case <-ctx.Done(): + m.cancelDiscoverers() + return ctx.Err() + } + } +} + +// SyncCh returns a read only channel used by all Discoverers to send target updates. +func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup { + return m.syncCh +} + +// ApplyConfig removes all running discovery providers and starts new ones using the provided config. +func (m *Manager) ApplyConfig(cfg *config.Config) error { + err := make(chan error) + m.actionCh <- func(ctx context.Context) { + m.cancelDiscoverers() + for _, scfg := range cfg.ScrapeConfigs { + for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { + m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov) + } + } + close(err) + } + + return <-err +} + +func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { + ctx, cancel := context.WithCancel(ctx) + updates := make(chan []*config.TargetGroup) + + m.discoverCancel = append(m.discoverCancel, cancel) + + go worker.Run(ctx, updates) + go m.runProvider(ctx, poolKey, updates) +} + +func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*config.TargetGroup) { + for { + select { + case <-ctx.Done(): + return + case tgs, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + m.addGroup(poolKey, tgs) + m.syncCh <- m.allGroups(poolKey) + } + } +} + +func (m *Manager) cancelDiscoverers() { + for _, c := range m.discoverCancel { + c() + } + m.targets = make(map[poolKey][]*config.TargetGroup) + m.discoverCancel = nil +} + +func (m *Manager) addGroup(poolKey poolKey, tg []*config.TargetGroup) { + done := make(chan struct{}) + + m.actionCh <- func(ctx context.Context) { + if tg != nil { + m.targets[poolKey] = tg + } + close(done) + + } + <-done +} + +func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup { + tSets := make(chan map[string][]*config.TargetGroup) + + m.actionCh <- func(ctx context.Context) { + + // Sorting by the poolKey is needed so that we can have predictable tests. + var pKeys []poolKey + for pk := range m.targets { + pKeys = append(pKeys, pk) + } + sort.Sort(byProvider(pKeys)) + + tSetsAll := map[string][]*config.TargetGroup{} + for _, pk := range pKeys { + for _, tg := range m.targets[pk] { + if tg.Source != "" { // Don't add empty targets. + tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg) + } + } + } + tSets <- tSetsAll + } + return <-tSets + +} + +func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer { + providers := map[string]Discoverer{} + + app := func(mech string, i int, tp Discoverer) { + providers[fmt.Sprintf("%s/%d", mech, i)] = tp + } + + for i, c := range cfg.DNSSDConfigs { + app("dns", i, dns.NewDiscovery(c, log.With(m.logger, "discovery", "dns"))) + } + for i, c := range cfg.FileSDConfigs { + app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file"))) + } + for i, c := range cfg.ConsulSDConfigs { + k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err) + continue + } + app("consul", i, k) + } + for i, c := range cfg.MarathonSDConfigs { + t, err := marathon.NewDiscovery(c, log.With(m.logger, "discovery", "marathon")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err) + continue + } + app("marathon", i, t) + } + for i, c := range cfg.KubernetesSDConfigs { + k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err) + continue + } + app("kubernetes", i, k) + } + for i, c := range cfg.ServersetSDConfigs { + app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper"))) + } + for i, c := range cfg.NerveSDConfigs { + app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve"))) + } + for i, c := range cfg.EC2SDConfigs { + app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2"))) + } + for i, c := range cfg.OpenstackSDConfigs { + openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err) + continue + } + app("openstack", i, openstackd) + } + + for i, c := range cfg.GCESDConfigs { + gced, err := gce.NewDiscovery(c, log.With(m.logger, "discovery", "gce")) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err) + continue + } + app("gce", i, gced) + } + for i, c := range cfg.AzureSDConfigs { + app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure"))) + } + for i, c := range cfg.TritonSDConfigs { + t, err := triton.New(log.With(m.logger, "discovery", "trition"), c) + if err != nil { + level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err) + continue + } + app("triton", i, t) + } + if len(cfg.StaticConfigs) > 0 { + app("static", 0, NewStaticProvider(cfg.StaticConfigs)) + } + + return providers +} + +// StaticProvider holds a list of target groups that never change. +type StaticProvider struct { + TargetGroups []*config.TargetGroup +} + +// NewStaticProvider returns a StaticProvider configured with the given +// target groups. +func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider { + for i, tg := range groups { + tg.Source = fmt.Sprintf("%d", i) + } + return &StaticProvider{groups} +} + +// Run implements the Worker interface. +func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + // We still have to consider that the consumer exits right away in which case + // the context will be canceled. + select { + case ch <- sd.TargetGroups: + case <-ctx.Done(): + } + close(ch) +} diff --git a/discovery/manager_test.go b/discovery/manager_test.go new file mode 100644 index 0000000000..748d19f18e --- /dev/null +++ b/discovery/manager_test.go @@ -0,0 +1,730 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "reflect" + "strconv" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + yaml "gopkg.in/yaml.v2" +) + +// TestDiscoveryManagerSyncCalls checks that the target updates are received in the expected order. +func TestDiscoveryManagerSyncCalls(t *testing.T) { + + // The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter + // Final targets array is ordered alphabetically by the name of the discoverer. + // For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge. + testCases := []struct { + title string + updates map[string][]update + expectedTargets [][]*config.TargetGroup + }{ + { + title: "Single TP no updates", + updates: map[string][]update{ + "tp1": {}, + }, + expectedTargets: nil, + }, + { + title: "Multips TPs no updates", + updates: map[string][]update{ + "tp1": {}, + "tp2": {}, + "tp3": {}, + }, + expectedTargets: nil, + }, + { + title: "Single TP empty initials", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{}, + interval: 5, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + {}, + }, + }, + { + title: "Multiple TPs empty initials", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{}, + interval: 5, + }, + }, + "tp2": { + { + targetGroups: []config.TargetGroup{}, + interval: 200, + }, + }, + "tp3": { + { + targetGroups: []config.TargetGroup{}, + interval: 100, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + {}, + {}, + {}, + }, + }, + { + title: "Single TP initials only", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }}, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + { + title: "Multiple TPs initials only", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + }, + }, + "tp2": { + { + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, + interval: 10, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + }, + }, + }, + { + title: "Single TP initials followed by empty updates", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 0, + }, + { + targetGroups: []config.TargetGroup{}, + interval: 10, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + {}, + }, + }, + { + title: "Single TP initials and new groups", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 0, + }, + { + targetGroups: []config.TargetGroup{ + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 10, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + }, + }, + { + title: "Multiple TPs initials and new groups", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 10, + }, + { + targetGroups: []config.TargetGroup{ + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 500, + }, + }, + "tp2": { + { + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + interval: 100, + }, + { + targetGroups: []config.TargetGroup{ + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + interval: 10, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + }, + }, + { + title: "One tp initials arrive after other tp updates.", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 10, + }, + { + targetGroups: []config.TargetGroup{ + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 150, + }, + }, + "tp2": { + { + targetGroups: []config.TargetGroup{ + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + interval: 200, + }, + { + targetGroups: []config.TargetGroup{ + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + interval: 100, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "tp1-initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "tp1-initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2-initial1", + Targets: []model.LabelSet{{"__instance__": "5"}}, + }, + { + Source: "tp2-initial2", + Targets: []model.LabelSet{{"__instance__": "6"}}, + }, + }, + { + { + Source: "tp1-update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "tp1-update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + { + Source: "tp2-update1", + Targets: []model.LabelSet{{"__instance__": "7"}}, + }, + { + Source: "tp2-update2", + Targets: []model.LabelSet{{"__instance__": "8"}}, + }, + }, + }, + }, + + { + title: "Single TP Single provider empty update in between", + updates: map[string][]update{ + "tp1": { + { + targetGroups: []config.TargetGroup{ + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + interval: 30, + }, + { + targetGroups: []config.TargetGroup{}, + interval: 10, + }, + { + targetGroups: []config.TargetGroup{ + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + interval: 300, + }, + }, + }, + expectedTargets: [][]*config.TargetGroup{ + { + { + Source: "initial1", + Targets: []model.LabelSet{{"__instance__": "1"}}, + }, + { + Source: "initial2", + Targets: []model.LabelSet{{"__instance__": "2"}}, + }, + }, + {}, + { + { + Source: "update1", + Targets: []model.LabelSet{{"__instance__": "3"}}, + }, + { + Source: "update2", + Targets: []model.LabelSet{{"__instance__": "4"}}, + }, + }, + }, + }, + } + + for testIndex, testCase := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(nil) + go discoveryManager.Run(ctx) + + var totalUpdatesCount int + for tpName, update := range testCase.updates { + provider := newMockDiscoveryProvider(update) + discoveryManager.startProvider(ctx, poolKey{setName: strconv.Itoa(testIndex), provider: tpName}, provider) + + if len(update) > 0 { + totalUpdatesCount = totalUpdatesCount + len(update) + } + } + + Loop: + for x := 0; x < totalUpdatesCount; x++ { + select { + case <-time.After(10 * time.Second): + t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title) + break Loop + case tsetMap := <-discoveryManager.SyncCh(): + for _, received := range tsetMap { + if !reflect.DeepEqual(received, testCase.expectedTargets[x]) { + var receivedFormated string + for _, receivedTargets := range received { + receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets) + } + var expectedFormated string + for _, expectedTargets := range testCase.expectedTargets[x] { + expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets) + } + + t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v", + x, testCase.title, + receivedFormated, + expectedFormated) + } + } + } + } + } +} + +func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) { + verifyPresence := func(tSets map[poolKey][]*config.TargetGroup, poolKey poolKey, label string, present bool) { + if _, ok := tSets[poolKey]; !ok { + t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets) + return + } + + match := false + var mergedTargets string + for _, targetGroup := range tSets[poolKey] { + + for _, l := range targetGroup.Targets { + mergedTargets = mergedTargets + " " + l.String() + if l.String() == label { + match = true + } + } + + } + if match != present { + msg := "" + if !present { + msg = "not" + } + t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets) + } + } + + cfg := &config.Config{} + + sOne := ` +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ["foo:9090"] + - targets: ["bar:9090"] +` + if err := yaml.Unmarshal([]byte(sOne), cfg); err != nil { + t.Fatalf("Unable to load YAML config sOne: %s", err) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(nil) + go discoveryManager.Run(ctx) + + discoveryManager.ApplyConfig(cfg) + + _ = <-discoveryManager.SyncCh() + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true) + + sTwo := ` +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ["foo:9090"] +` + if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil { + t.Fatalf("Unable to load YAML config sOne: %s", err) + } + discoveryManager.ApplyConfig(cfg) + + _ = <-discoveryManager.SyncCh() + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) + verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false) +} + +type update struct { + targetGroups []config.TargetGroup + interval time.Duration +} + +type mockdiscoveryProvider struct { + updates []update + up chan<- []*config.TargetGroup +} + +func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider { + + tp := mockdiscoveryProvider{ + updates: updates, + } + return tp +} + +func (tp mockdiscoveryProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) { + tp.up = up + tp.sendUpdates() +} + +func (tp mockdiscoveryProvider) sendUpdates() { + for _, update := range tp.updates { + + time.Sleep(update.interval * time.Millisecond) + + tgs := make([]*config.TargetGroup, len(update.targetGroups)) + for i := range update.targetGroups { + tgs[i] = &update.targetGroups[i] + } + tp.up <- tgs + } +} diff --git a/notifier/notifier.go b/notifier/notifier.go index 36e63fd6e7..d2c14b7655 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -35,7 +35,6 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/util/httputil" @@ -248,7 +247,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs amSets := []*alertmanagerSet{} - ctx, cancel := context.WithCancel(n.ctx) for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs { ams, err := newAlertmanagerSet(cfg, n.logger) @@ -261,17 +259,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { amSets = append(amSets, ams) } - // After all sets were created successfully, start them and cancel the - // old ones. - for _, ams := range amSets { - go ams.ts.Run(ctx) - ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger)) - } - if n.cancelDiscovery != nil { - n.cancelDiscovery() - } - - n.cancelDiscovery = cancel n.alertmanagers = amSets return nil @@ -504,7 +491,6 @@ func (a alertmanagerLabels) url() *url.URL { // alertmanagerSet contains a set of Alertmanagers discovered via a group of service // discovery definitions that have a common configuration on how alerts should be sent. type alertmanagerSet struct { - ts *discovery.TargetSet cfg *config.AlertmanagerConfig client *http.Client @@ -525,8 +511,6 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale cfg: cfg, logger: logger, } - s.ts = discovery.NewTargetSet(s) - return s, nil } diff --git a/retrieval/manager.go b/retrieval/manager.go new file mode 100644 index 0000000000..7d9de9445a --- /dev/null +++ b/retrieval/manager.go @@ -0,0 +1,162 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retrieval + +import ( + "fmt" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" +) + +// Appendable returns an Appender. +type Appendable interface { + Appender() (storage.Appender, error) +} + +// NewScrapeManager is the ScrapeManager constructor +func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { + + return &ScrapeManager{ + append: app, + logger: logger, + actionCh: make(chan func()), + scrapeConfigs: make(map[string]*config.ScrapeConfig), + scrapePools: make(map[string]*scrapePool), + graceShut: make(chan struct{}), + } +} + +// ScrapeManager maintains a set of scrape pools and manages start/stop cycles +// when receiving new target groups form the discovery manager. +type ScrapeManager struct { + logger log.Logger + append Appendable + scrapeConfigs map[string]*config.ScrapeConfig + scrapePools map[string]*scrapePool + actionCh chan func() + graceShut chan struct{} +} + +// Run starts background processing to handle target updates and reload the scraping loops. +func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error { + level.Info(m.logger).Log("msg", "Starting scrape manager...") + + for { + select { + case f := <-m.actionCh: + f() + case ts := <-tsets: + if err := m.reload(ts); err != nil { + level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err) + } + case <-m.graceShut: + return nil + } + } +} + +// Stop cancels all running scrape pools and blocks until all have exited. +func (m *ScrapeManager) Stop() { + for _, sp := range m.scrapePools { + sp.stop() + } + close(m.graceShut) +} + +// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. +func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { + done := make(chan struct{}) + m.actionCh <- func() { + for _, scfg := range cfg.ScrapeConfigs { + m.scrapeConfigs[scfg.JobName] = scfg + } + close(done) + } + <-done + return nil +} + +// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. +func (m *ScrapeManager) TargetMap() map[string][]*Target { + targetsMap := make(chan map[string][]*Target) + m.actionCh <- func() { + targets := make(map[string][]*Target) + for jobName, sp := range m.scrapePools { + sp.mtx.RLock() + for _, t := range sp.targets { + targets[jobName] = append(targets[jobName], t) + } + targets[jobName] = append(targets[jobName], sp.droppedTargets...) + sp.mtx.RUnlock() + } + targetsMap <- targets + } + return <-targetsMap +} + +// Targets returns the targets currently being scraped. +func (m *ScrapeManager) Targets() []*Target { + targets := make(chan []*Target) + m.actionCh <- func() { + var t []*Target + for _, p := range m.scrapePools { + p.mtx.RLock() + for _, tt := range p.targets { + t = append(t, tt) + } + p.mtx.RUnlock() + } + targets <- t + } + return <-targets +} + +func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error { + for tsetName, tgroup := range t { + scrapeConfig, ok := m.scrapeConfigs[tsetName] + if !ok { + return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) + } + + // Scrape pool doesn't exist so start a new one. + existing, ok := m.scrapePools[tsetName] + if !ok { + sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + m.scrapePools[tsetName] = sp + sp.Sync(tgroup) + + } else { + existing.Sync(tgroup) + } + + // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. + jobs := make(map[string]struct{}) + + for k := range m.scrapeConfigs { + jobs[k] = struct{}{} + } + + for name, sp := range m.scrapePools { + if _, ok := jobs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) + } + } + } + return nil +} diff --git a/retrieval/targetmanager_test.go b/retrieval/manager_test.go similarity index 100% rename from retrieval/targetmanager_test.go rename to retrieval/manager_test.go diff --git a/retrieval/scrape.go b/retrieval/scrape.go index ed1aac14db..d07663e9e2 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -117,7 +117,6 @@ func init() { type scrapePool struct { appendable Appendable logger log.Logger - ctx context.Context mtx sync.RWMutex config *config.ScrapeConfig @@ -127,6 +126,7 @@ type scrapePool struct { targets map[uint64]*Target droppedTargets []*Target loops map[uint64]loop + cancel context.CancelFunc // Constructor for new scrape loops. This is settable for testing convenience. newLoop func(*Target, scraper) loop @@ -136,7 +136,7 @@ const maxAheadTime = 10 * time.Minute type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { +func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool { if logger == nil { logger = log.NewNopLogger() } @@ -149,17 +149,20 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable buffers := pool.NewBytesPool(163, 100e6, 3) + ctx, cancel := context.WithCancel(context.Background()) sp := &scrapePool{ + cancel: cancel, appendable: app, config: cfg, - ctx: ctx, client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, logger: logger, } sp.newLoop = func(t *Target, s scraper) loop { - return newScrapeLoop(sp.ctx, s, + return newScrapeLoop( + ctx, + s, log.With(logger, "target", t), buffers, func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) }, @@ -173,6 +176,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable // stop terminates all scrape loops and returns after they all terminated. func (sp *scrapePool) stop() { + sp.cancel() var wg sync.WaitGroup sp.mtx.Lock() @@ -189,7 +193,6 @@ func (sp *scrapePool) stop() { delete(sp.loops, fp) delete(sp.targets, fp) } - wg.Wait() } @@ -582,8 +585,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { } } -func newScrapeLoop( - ctx context.Context, +func newScrapeLoop(ctx context.Context, sc scraper, l log.Logger, buffers *pool.BytesPool, @@ -605,8 +607,8 @@ func newScrapeLoop( sampleMutator: sampleMutator, reportSampleMutator: reportSampleMutator, stopped: make(chan struct{}), - ctx: ctx, l: l, + ctx: ctx, } sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index e43ed80a7c..b3a0466ab2 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp = newScrapePool(context.Background(), cfg, app, nil) + sp = newScrapePool(cfg, app, nil) ) if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { @@ -75,6 +75,7 @@ func TestScrapePoolStop(t *testing.T) { sp := &scrapePool{ targets: map[uint64]*Target{}, loops: map[uint64]loop{}, + cancel: func() {}, } var mtx sync.Mutex stopped := map[uint64]bool{} @@ -231,7 +232,7 @@ func TestScrapePoolReload(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp := newScrapePool(context.Background(), cfg, app, nil) + sp := newScrapePool(cfg, app, nil) wrapped := sp.appender() diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go deleted file mode 100644 index f826406642..0000000000 --- a/retrieval/targetmanager.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package retrieval - -import ( - "context" - "sync" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - - "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/storage" -) - -// TargetManager maintains a set of targets, starts and stops their scraping and -// creates the new targets based on the target groups it receives from various -// target providers. -type TargetManager struct { - append Appendable - scrapeConfigs []*config.ScrapeConfig - - mtx sync.RWMutex - ctx context.Context - cancel func() - wg sync.WaitGroup - - // Set of unqiue targets by scrape configuration. - targetSets map[string]*targetSet - logger log.Logger - starting chan struct{} -} - -type targetSet struct { - ctx context.Context - cancel func() - - ts *discovery.TargetSet - sp *scrapePool -} - -// Appendable returns an Appender. -type Appendable interface { - Appender() (storage.Appender, error) -} - -// NewTargetManager creates a new TargetManager. -func NewTargetManager(app Appendable, logger log.Logger) *TargetManager { - return &TargetManager{ - append: app, - targetSets: map[string]*targetSet{}, - logger: logger, - starting: make(chan struct{}), - } -} - -// Run starts background processing to handle target updates. -func (tm *TargetManager) Run() { - level.Info(tm.logger).Log("msg", "Starting target manager...") - - tm.mtx.Lock() - - tm.ctx, tm.cancel = context.WithCancel(context.Background()) - tm.reload() - - tm.mtx.Unlock() - close(tm.starting) - - tm.wg.Wait() -} - -// Stop all background processing. -func (tm *TargetManager) Stop() { - <-tm.starting - level.Info(tm.logger).Log("msg", "Stopping target manager...") - - tm.mtx.Lock() - // Cancel the base context, this will cause all target providers to shut down - // and all in-flight scrapes to abort immmediately. - // Started inserts will be finished before terminating. - tm.cancel() - tm.mtx.Unlock() - - // Wait for all scrape inserts to complete. - tm.wg.Wait() - - level.Info(tm.logger).Log("msg", "Target manager stopped") -} - -func (tm *TargetManager) reload() { - jobs := map[string]struct{}{} - - // Start new target sets and update existing ones. - for _, scfg := range tm.scrapeConfigs { - jobs[scfg.JobName] = struct{}{} - - ts, ok := tm.targetSets[scfg.JobName] - if !ok { - ctx, cancel := context.WithCancel(tm.ctx) - ts = &targetSet{ - ctx: ctx, - cancel: cancel, - sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)), - } - ts.ts = discovery.NewTargetSet(ts.sp) - - tm.targetSets[scfg.JobName] = ts - - tm.wg.Add(1) - - go func(ts *targetSet) { - // Run target set, which blocks until its context is canceled. - // Gracefully shut down pending scrapes in the scrape pool afterwards. - ts.ts.Run(ctx) - ts.sp.stop() - tm.wg.Done() - }(ts) - } else { - ts.sp.reload(scfg) - } - ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger)) - } - - // Remove old target sets. Waiting for scrape pools to complete pending - // scrape inserts is already guaranteed by the goroutine that started the target set. - for name, ts := range tm.targetSets { - if _, ok := jobs[name]; !ok { - ts.cancel() - delete(tm.targetSets, name) - } - } -} - -// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. -func (tm *TargetManager) TargetMap() map[string][]*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targetsMap := make(map[string][]*Target) - for jobName, ps := range tm.targetSets { - ps.sp.mtx.RLock() - for _, t := range ps.sp.targets { - targetsMap[jobName] = append(targetsMap[jobName], t) - } - targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...) - ps.sp.mtx.RUnlock() - } - return targetsMap -} - -// Targets returns the targets currently being scraped. -func (tm *TargetManager) Targets() []*Target { - tm.mtx.RLock() - defer tm.mtx.RUnlock() - - targets := []*Target{} - for _, ps := range tm.targetSets { - ps.sp.mtx.RLock() - - for _, t := range ps.sp.targets { - targets = append(targets, t) - } - - ps.sp.mtx.RUnlock() - } - - return targets -} - -// ApplyConfig resets the manager's target providers and job configurations as defined -// by the new cfg. The state of targets that are valid in the new configuration remains unchanged. -func (tm *TargetManager) ApplyConfig(cfg *config.Config) error { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - tm.scrapeConfigs = cfg.ScrapeConfigs - - if tm.ctx != nil { - tm.reload() - } - return nil -} diff --git a/web/web.go b/web/web.go index d9875150e6..74752f72ef 100644 --- a/web/web.go +++ b/web/web.go @@ -71,7 +71,7 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"} type Handler struct { logger log.Logger - targetManager *retrieval.TargetManager + scrapeManager *retrieval.ScrapeManager ruleManager *rules.Manager queryEngine *promql.Engine context context.Context @@ -125,7 +125,7 @@ type Options struct { TSDB func() *tsdb.DB Storage storage.Storage QueryEngine *promql.Engine - TargetManager *retrieval.TargetManager + ScrapeManager *retrieval.ScrapeManager RuleManager *rules.Manager Notifier *notifier.Notifier Version *PrometheusVersion @@ -169,7 +169,7 @@ func New(logger log.Logger, o *Options) *Handler { flagsMap: o.Flags, context: o.Context, - targetManager: o.TargetManager, + scrapeManager: o.ScrapeManager, ruleManager: o.RuleManager, queryEngine: o.QueryEngine, tsdb: o.TSDB, @@ -181,7 +181,7 @@ func New(logger log.Logger, o *Options) *Handler { ready: 0, } - h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.targetManager, h.notifier, + h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier, func() config.Config { h.mtx.RLock() defer h.mtx.RUnlock() @@ -405,7 +405,7 @@ func (h *Handler) Run(ctx context.Context) error { h.options.QueryEngine, h.options.Storage.Querier, func() []*retrieval.Target { - return h.options.TargetManager.Targets() + return h.options.ScrapeManager.Targets() }, func() []*url.URL { return h.options.Notifier.Alertmanagers() @@ -587,7 +587,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) { func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { var index []string - targets := h.targetManager.TargetMap() + targets := h.scrapeManager.TargetMap() for job := range targets { index = append(index, job) } @@ -605,7 +605,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { // Bucket targets by job label tps := map[string][]*retrieval.Target{} - for _, t := range h.targetManager.Targets() { + for _, t := range h.scrapeManager.Targets() { job := t.Labels().Get(model.JobLabel) tps[job] = append(tps[job], t) } diff --git a/web/web_test.go b/web/web_test.go index b17a0b1c20..7b02935437 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -96,7 +96,7 @@ func TestReadyAndHealthy(t *testing.T) { Context: nil, Storage: &tsdb.ReadyStorage{}, QueryEngine: nil, - TargetManager: nil, + ScrapeManager: nil, RuleManager: nil, Notifier: nil, RoutePrefix: "/", @@ -187,7 +187,7 @@ func TestRoutePrefix(t *testing.T) { Context: nil, Storage: &tsdb.ReadyStorage{}, QueryEngine: nil, - TargetManager: nil, + ScrapeManager: nil, RuleManager: nil, Notifier: nil, RoutePrefix: "/prometheus",