mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
discovery: move TargetProvider and multi-constructor
This commit is contained in:
parent
bd0048477c
commit
7bd9508c9b
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package discovery
|
package azure
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -45,15 +45,13 @@ const (
|
||||||
var (
|
var (
|
||||||
azureSDRefreshFailuresCount = prometheus.NewCounter(
|
azureSDRefreshFailuresCount = prometheus.NewCounter(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_azure_refresh_failures_total",
|
||||||
Name: "sd_azure_refresh_failures_total",
|
Help: "Number of Azure-SD refresh failures.",
|
||||||
Help: "Number of Azure-SD refresh failures.",
|
|
||||||
})
|
})
|
||||||
azureSDRefreshDuration = prometheus.NewSummary(
|
azureSDRefreshDuration = prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_azure_refresh_duration_seconds",
|
||||||
Name: "sd_azure_refresh_duration_seconds",
|
Help: "The duration of a Azure-SD refresh in seconds.",
|
||||||
Help: "The duration of a Azure-SD refresh in seconds.",
|
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -70,8 +68,8 @@ type AzureDiscovery struct {
|
||||||
port int
|
port int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAzureDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
|
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
|
||||||
func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery {
|
func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery {
|
||||||
return &AzureDiscovery{
|
return &AzureDiscovery{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
interval: time.Duration(cfg.RefreshInterval),
|
interval: time.Duration(cfg.RefreshInterval),
|
||||||
|
|
|
@ -14,30 +14,124 @@
|
||||||
package discovery
|
package discovery
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/retrieval/discovery/azure"
|
||||||
"github.com/prometheus/prometheus/retrieval/discovery/consul"
|
"github.com/prometheus/prometheus/retrieval/discovery/consul"
|
||||||
"github.com/prometheus/prometheus/retrieval/discovery/dns"
|
"github.com/prometheus/prometheus/retrieval/discovery/dns"
|
||||||
|
"github.com/prometheus/prometheus/retrieval/discovery/ec2"
|
||||||
|
"github.com/prometheus/prometheus/retrieval/discovery/file"
|
||||||
|
"github.com/prometheus/prometheus/retrieval/discovery/gce"
|
||||||
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
|
"github.com/prometheus/prometheus/retrieval/discovery/kubernetes"
|
||||||
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
|
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
|
||||||
|
"github.com/prometheus/prometheus/retrieval/discovery/zookeeper"
|
||||||
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewConsul creates a new Consul based Discovery.
|
// A TargetProvider provides information about target groups. It maintains a set
|
||||||
func NewConsul(cfg *config.ConsulSDConfig) (*consul.Discovery, error) {
|
// of sources from which TargetGroups can originate. Whenever a target provider
|
||||||
return consul.NewDiscovery(cfg)
|
// detects a potential change, it sends the TargetGroup through its provided channel.
|
||||||
|
//
|
||||||
|
// The TargetProvider does not have to guarantee that an actual change happened.
|
||||||
|
// It does guarantee that it sends the new TargetGroup whenever a change happens.
|
||||||
|
//
|
||||||
|
// Providers must initially send all known target groups as soon as it can.
|
||||||
|
type TargetProvider interface {
|
||||||
|
// Run hands a channel to the target provider through which it can send
|
||||||
|
// updated target groups. The channel must be closed by the target provider
|
||||||
|
// if no more updates will be sent.
|
||||||
|
// On receiving from done Run must return.
|
||||||
|
Run(ctx context.Context, up chan<- []*config.TargetGroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKubernetesDiscovery creates a Kubernetes service discovery based on the passed-in configuration.
|
// ProvidersFromConfig returns all TargetProviders configured in cfg.
|
||||||
func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubernetes, error) {
|
func ProvidersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
|
||||||
return kubernetes.New(log.Base(), conf)
|
providers := map[string]TargetProvider{}
|
||||||
|
|
||||||
|
app := func(mech string, i int, tp TargetProvider) {
|
||||||
|
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, c := range cfg.DNSSDConfigs {
|
||||||
|
app("dns", i, dns.NewDiscovery(c))
|
||||||
|
}
|
||||||
|
for i, c := range cfg.FileSDConfigs {
|
||||||
|
app("file", i, file.NewDiscovery(c))
|
||||||
|
}
|
||||||
|
for i, c := range cfg.ConsulSDConfigs {
|
||||||
|
k, err := consul.NewDiscovery(c)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot create Consul discovery: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
app("consul", i, k)
|
||||||
|
}
|
||||||
|
for i, c := range cfg.MarathonSDConfigs {
|
||||||
|
m, err := marathon.NewDiscovery(c)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot create Marathon discovery: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
app("marathon", i, m)
|
||||||
|
}
|
||||||
|
for i, c := range cfg.KubernetesSDConfigs {
|
||||||
|
k, err := kubernetes.New(log.Base(), c)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot create Kubernetes discovery: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
app("kubernetes", i, k)
|
||||||
|
}
|
||||||
|
for i, c := range cfg.ServersetSDConfigs {
|
||||||
|
app("serverset", i, zookeeper.NewServersetDiscovery(c))
|
||||||
|
}
|
||||||
|
for i, c := range cfg.NerveSDConfigs {
|
||||||
|
app("nerve", i, zookeeper.NewNerveDiscovery(c))
|
||||||
|
}
|
||||||
|
for i, c := range cfg.EC2SDConfigs {
|
||||||
|
app("ec2", i, ec2.NewDiscovery(c))
|
||||||
|
}
|
||||||
|
for i, c := range cfg.GCESDConfigs {
|
||||||
|
gced, err := gce.NewDiscovery(c)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Cannot initialize GCE discovery: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
app("gce", i, gced)
|
||||||
|
}
|
||||||
|
for i, c := range cfg.AzureSDConfigs {
|
||||||
|
app("azure", i, azure.NewDiscovery(c))
|
||||||
|
}
|
||||||
|
if len(cfg.StaticConfigs) > 0 {
|
||||||
|
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
|
||||||
|
}
|
||||||
|
|
||||||
|
return providers
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMarathon creates a new Marathon based discovery.
|
// StaticProvider holds a list of target groups that never change.
|
||||||
func NewMarathon(conf *config.MarathonSDConfig) (*marathon.Discovery, error) {
|
type StaticProvider struct {
|
||||||
return marathon.NewDiscovery(conf)
|
TargetGroups []*config.TargetGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDNS creates a new DNS based discovery.
|
// NewStaticProvider returns a StaticProvider configured with the given
|
||||||
func NewDNS(conf *config.DNSSDConfig) *dns.Discovery {
|
// target groups.
|
||||||
return dns.NewDiscovery(conf)
|
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
|
||||||
|
for i, tg := range groups {
|
||||||
|
tg.Source = fmt.Sprintf("%d", i)
|
||||||
|
}
|
||||||
|
return &StaticProvider{groups}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run implements the TargetProvider interface.
|
||||||
|
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
||||||
|
// We still have to consider that the consumer exits right away in which case
|
||||||
|
// the context will be canceled.
|
||||||
|
select {
|
||||||
|
case ch <- sd.TargetGroups:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package discovery
|
package ec2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -50,15 +50,13 @@ const (
|
||||||
var (
|
var (
|
||||||
ec2SDRefreshFailuresCount = prometheus.NewCounter(
|
ec2SDRefreshFailuresCount = prometheus.NewCounter(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_ec2_refresh_failures_total",
|
||||||
Name: "sd_ec2_refresh_failures_total",
|
Help: "The number of EC2-SD scrape failures.",
|
||||||
Help: "The number of EC2-SD scrape failures.",
|
|
||||||
})
|
})
|
||||||
ec2SDRefreshDuration = prometheus.NewSummary(
|
ec2SDRefreshDuration = prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_ec2_refresh_duration_seconds",
|
||||||
Name: "sd_ec2_refresh_duration_seconds",
|
Help: "The duration of a EC2-SD refresh in seconds.",
|
||||||
Help: "The duration of a EC2-SD refresh in seconds.",
|
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -76,8 +74,8 @@ type EC2Discovery struct {
|
||||||
port int
|
port int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEC2Discovery returns a new EC2Discovery which periodically refreshes its targets.
|
// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets.
|
||||||
func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
|
func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery {
|
||||||
creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "")
|
creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "")
|
||||||
if conf.AccessKey == "" && conf.SecretKey == "" {
|
if conf.AccessKey == "" && conf.SecretKey == "" {
|
||||||
creds = nil
|
creds = nil
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package discovery
|
package file
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -36,15 +36,13 @@ const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath"
|
||||||
var (
|
var (
|
||||||
fileSDScanDuration = prometheus.NewSummary(
|
fileSDScanDuration = prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_file_scan_duration_seconds",
|
||||||
Name: "sd_file_scan_duration_seconds",
|
Help: "The duration of the File-SD scan in seconds.",
|
||||||
Help: "The duration of the File-SD scan in seconds.",
|
|
||||||
})
|
})
|
||||||
fileSDReadErrorsCount = prometheus.NewCounter(
|
fileSDReadErrorsCount = prometheus.NewCounter(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_file_read_errors_total",
|
||||||
Name: "sd_file_read_errors_total",
|
Help: "The number of File-SD read errors.",
|
||||||
Help: "The number of File-SD read errors.",
|
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -67,8 +65,8 @@ type FileDiscovery struct {
|
||||||
lastRefresh map[string]int
|
lastRefresh map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileDiscovery returns a new file discovery for the given paths.
|
// NewDiscovery returns a new file discovery for the given paths.
|
||||||
func NewFileDiscovery(conf *config.FileSDConfig) *FileDiscovery {
|
func NewDiscovery(conf *config.FileSDConfig) *FileDiscovery {
|
||||||
return &FileDiscovery{
|
return &FileDiscovery{
|
||||||
paths: conf.Files,
|
paths: conf.Files,
|
||||||
interval: time.Duration(conf.RefreshInterval),
|
interval: time.Duration(conf.RefreshInterval),
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package discovery
|
package file
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -41,7 +41,7 @@ func testFileSD(t *testing.T, ext string) {
|
||||||
conf.RefreshInterval = model.Duration(1 * time.Hour)
|
conf.RefreshInterval = model.Duration(1 * time.Hour)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
fsd = NewFileDiscovery(&conf)
|
fsd = NewDiscovery(&conf)
|
||||||
ch = make(chan []*config.TargetGroup)
|
ch = make(chan []*config.TargetGroup)
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"targets": ["localhost:9090", "example.org:443"],
|
|
||||||
"labels": {
|
|
||||||
"foo": "bar"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"targets": ["my.domain"]
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -1,5 +0,0 @@
|
||||||
- targets: ['localhost:9090', 'example.org:443']
|
|
||||||
labels:
|
|
||||||
foo: bar
|
|
||||||
|
|
||||||
- targets: ['my.domain']
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package discovery
|
package gce
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -52,15 +52,13 @@ const (
|
||||||
var (
|
var (
|
||||||
gceSDRefreshFailuresCount = prometheus.NewCounter(
|
gceSDRefreshFailuresCount = prometheus.NewCounter(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_gce_refresh_failures_total",
|
||||||
Name: "sd_gce_refresh_failures_total",
|
Help: "The number of GCE-SD refresh failures.",
|
||||||
Help: "The number of GCE-SD refresh failures.",
|
|
||||||
})
|
})
|
||||||
gceSDRefreshDuration = prometheus.NewSummary(
|
gceSDRefreshDuration = prometheus.NewSummary(
|
||||||
prometheus.SummaryOpts{
|
prometheus.SummaryOpts{
|
||||||
Namespace: namespace,
|
Name: "prometheus_sd_gce_refresh_duration",
|
||||||
Name: "sd_gce_refresh_duration",
|
Help: "The duration of a GCE-SD refresh in seconds.",
|
||||||
Help: "The duration of a GCE-SD refresh in seconds.",
|
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -84,7 +82,7 @@ type GCEDiscovery struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets.
|
// NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets.
|
||||||
func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) {
|
func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) {
|
||||||
gd := &GCEDiscovery{
|
gd := &GCEDiscovery{
|
||||||
project: conf.Project,
|
project: conf.Project,
|
||||||
zone: conf.Zone,
|
zone: conf.Zone,
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package discovery
|
package zookeeper
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -42,17 +42,17 @@ type ZookeeperDiscovery struct {
|
||||||
|
|
||||||
// NewNerveDiscovery returns a new NerveDiscovery for the given config.
|
// NewNerveDiscovery returns a new NerveDiscovery for the given config.
|
||||||
func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery {
|
func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery {
|
||||||
return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
|
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServersetDiscovery returns a new ServersetDiscovery for the given config.
|
// NewServersetDiscovery returns a new ServersetDiscovery for the given config.
|
||||||
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery {
|
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery {
|
||||||
return NewZookeeperDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
|
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewZookeeperDiscovery returns a new discovery along Zookeeper parses with
|
// NewDiscovery returns a new discovery along Zookeeper parses with
|
||||||
// the given parse function.
|
// the given parse function.
|
||||||
func NewZookeeperDiscovery(
|
func NewDiscovery(
|
||||||
srvs []string,
|
srvs []string,
|
||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
paths []string,
|
paths []string,
|
||||||
|
|
|
@ -31,22 +31,6 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A TargetProvider provides information about target groups. It maintains a set
|
|
||||||
// of sources from which TargetGroups can originate. Whenever a target provider
|
|
||||||
// detects a potential change, it sends the TargetGroup through its provided channel.
|
|
||||||
//
|
|
||||||
// The TargetProvider does not have to guarantee that an actual change happened.
|
|
||||||
// It does guarantee that it sends the new TargetGroup whenever a change happens.
|
|
||||||
//
|
|
||||||
// Providers must initially send all known target groups as soon as it can.
|
|
||||||
type TargetProvider interface {
|
|
||||||
// Run hands a channel to the target provider through which it can send
|
|
||||||
// updated target groups. The channel must be closed by the target provider
|
|
||||||
// if no more updates will be sent.
|
|
||||||
// On receiving from done Run must return.
|
|
||||||
Run(ctx context.Context, up chan<- []*config.TargetGroup)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TargetManager maintains a set of targets, starts and stops their scraping and
|
// TargetManager maintains a set of targets, starts and stops their scraping and
|
||||||
// creates the new targets based on the target groups it receives from various
|
// creates the new targets based on the target groups it receives from various
|
||||||
// target providers.
|
// target providers.
|
||||||
|
@ -123,7 +107,7 @@ func (tm *TargetManager) reload() {
|
||||||
} else {
|
} else {
|
||||||
ts.reload(scfg)
|
ts.reload(scfg)
|
||||||
}
|
}
|
||||||
ts.runProviders(tm.ctx, providersFromConfig(scfg))
|
ts.runProviders(tm.ctx, discovery.ProvidersFromConfig(scfg))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove old target sets. Waiting for stopping is already guaranteed
|
// Remove old target sets. Waiting for stopping is already guaranteed
|
||||||
|
@ -257,7 +241,7 @@ func (ts *targetSet) sync() {
|
||||||
ts.scrapePool.sync(all)
|
ts.scrapePool.sync(all)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]TargetProvider) {
|
func (ts *targetSet) runProviders(ctx context.Context, providers map[string]discovery.TargetProvider) {
|
||||||
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
|
// Lock for the entire time. This may mean up to 5 seconds until the full initial set
|
||||||
// is retrieved and applied.
|
// is retrieved and applied.
|
||||||
// We could release earlier with some tweaks, but this is easier to reason about.
|
// We could release earlier with some tweaks, but this is easier to reason about.
|
||||||
|
@ -281,7 +265,7 @@ func (ts *targetSet) runProviders(ctx context.Context, providers map[string]Targ
|
||||||
|
|
||||||
updates := make(chan []*config.TargetGroup)
|
updates := make(chan []*config.TargetGroup)
|
||||||
|
|
||||||
go func(name string, prov TargetProvider) {
|
go func(name string, prov discovery.TargetProvider) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case initial, ok := <-updates:
|
case initial, ok := <-updates:
|
||||||
|
@ -366,71 +350,6 @@ func (ts *targetSet) update(name string, tgroup *config.TargetGroup) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// providersFromConfig returns all TargetProviders configured in cfg.
|
|
||||||
func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
|
|
||||||
providers := map[string]TargetProvider{}
|
|
||||||
|
|
||||||
app := func(mech string, i int, tp TargetProvider) {
|
|
||||||
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, c := range cfg.DNSSDConfigs {
|
|
||||||
app("dns", i, discovery.NewDNS(c))
|
|
||||||
}
|
|
||||||
for i, c := range cfg.FileSDConfigs {
|
|
||||||
app("file", i, discovery.NewFileDiscovery(c))
|
|
||||||
}
|
|
||||||
for i, c := range cfg.ConsulSDConfigs {
|
|
||||||
k, err := discovery.NewConsul(c)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Cannot create Consul discovery: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
app("consul", i, k)
|
|
||||||
}
|
|
||||||
for i, c := range cfg.MarathonSDConfigs {
|
|
||||||
m, err := discovery.NewMarathon(c)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Cannot create Marathon discovery: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
app("marathon", i, m)
|
|
||||||
}
|
|
||||||
for i, c := range cfg.KubernetesSDConfigs {
|
|
||||||
k, err := discovery.NewKubernetesDiscovery(c)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Cannot create Kubernetes discovery: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
app("kubernetes", i, k)
|
|
||||||
}
|
|
||||||
for i, c := range cfg.ServersetSDConfigs {
|
|
||||||
app("serverset", i, discovery.NewServersetDiscovery(c))
|
|
||||||
}
|
|
||||||
for i, c := range cfg.NerveSDConfigs {
|
|
||||||
app("nerve", i, discovery.NewNerveDiscovery(c))
|
|
||||||
}
|
|
||||||
for i, c := range cfg.EC2SDConfigs {
|
|
||||||
app("ec2", i, discovery.NewEC2Discovery(c))
|
|
||||||
}
|
|
||||||
for i, c := range cfg.GCESDConfigs {
|
|
||||||
gced, err := discovery.NewGCEDiscovery(c)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Cannot initialize GCE discovery: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
app("gce", i, gced)
|
|
||||||
}
|
|
||||||
for i, c := range cfg.AzureSDConfigs {
|
|
||||||
app("azure", i, discovery.NewAzureDiscovery(c))
|
|
||||||
}
|
|
||||||
if len(cfg.StaticConfigs) > 0 {
|
|
||||||
app("static", 0, NewStaticProvider(cfg.StaticConfigs))
|
|
||||||
}
|
|
||||||
|
|
||||||
return providers
|
|
||||||
}
|
|
||||||
|
|
||||||
// populateLabels builds a label set from the given label set and scrape configuration.
|
// populateLabels builds a label set from the given label set and scrape configuration.
|
||||||
// It returns a label set before relabeling was applied as the second return value.
|
// It returns a label set before relabeling was applied as the second return value.
|
||||||
// Returns a nil label set if the target is dropped during relabeling.
|
// Returns a nil label set if the target is dropped during relabeling.
|
||||||
|
@ -530,28 +449,3 @@ func targetsFromGroup(tg *config.TargetGroup, cfg *config.ScrapeConfig) ([]*Targ
|
||||||
}
|
}
|
||||||
return targets, nil
|
return targets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StaticProvider holds a list of target groups that never change.
|
|
||||||
type StaticProvider struct {
|
|
||||||
TargetGroups []*config.TargetGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStaticProvider returns a StaticProvider configured with the given
|
|
||||||
// target groups.
|
|
||||||
func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
|
|
||||||
for i, tg := range groups {
|
|
||||||
tg.Source = fmt.Sprintf("%d", i)
|
|
||||||
}
|
|
||||||
return &StaticProvider{groups}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run implements the TargetProvider interface.
|
|
||||||
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
|
|
||||||
// We still have to consider that the consumer exits right away in which case
|
|
||||||
// the context will be canceled.
|
|
||||||
select {
|
|
||||||
case ch <- sd.TargetGroups:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
}
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
|
"github.com/prometheus/prometheus/retrieval/discovery"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -55,7 +56,7 @@ static_configs:
|
||||||
|
|
||||||
ts := newTargetSet(scrapeConfig, mss)
|
ts := newTargetSet(scrapeConfig, mss)
|
||||||
|
|
||||||
ts.runProviders(context.Background(), providersFromConfig(scrapeConfig))
|
ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig))
|
||||||
|
|
||||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||||
verifyPresence(ts.tgroups, "static/0/1", true)
|
verifyPresence(ts.tgroups, "static/0/1", true)
|
||||||
|
@ -69,7 +70,7 @@ static_configs:
|
||||||
t.Fatalf("Unable to load YAML config sTwo: %s", err)
|
t.Fatalf("Unable to load YAML config sTwo: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ts.runProviders(context.Background(), providersFromConfig(scrapeConfig))
|
ts.runProviders(context.Background(), discovery.ProvidersFromConfig(scrapeConfig))
|
||||||
|
|
||||||
verifyPresence(ts.tgroups, "static/0/0", true)
|
verifyPresence(ts.tgroups, "static/0/0", true)
|
||||||
verifyPresence(ts.tgroups, "static/0/1", false)
|
verifyPresence(ts.tgroups, "static/0/1", false)
|
||||||
|
|
Loading…
Reference in a new issue