Merge pull request #2045 from bekbulatov/marathon_tls

Add tls support for marathon
This commit is contained in:
Julius Volz 2016-10-24 12:10:47 +01:00 committed by GitHub
commit 72e4f1caa3
7 changed files with 95 additions and 53 deletions

View file

@ -124,6 +124,7 @@ var (
// DefaultMarathonSDConfig is the default Marathon SD configuration. // DefaultMarathonSDConfig is the default Marathon SD configuration.
DefaultMarathonSDConfig = MarathonSDConfig{ DefaultMarathonSDConfig = MarathonSDConfig{
Timeout: model.Duration(30 * time.Second),
RefreshInterval: model.Duration(30 * time.Second), RefreshInterval: model.Duration(30 * time.Second),
} }
@ -225,6 +226,13 @@ func resolveFilepaths(baseDir string, cfg *Config) {
kcfg.TLSConfig.CertFile = join(kcfg.TLSConfig.CertFile) kcfg.TLSConfig.CertFile = join(kcfg.TLSConfig.CertFile)
kcfg.TLSConfig.KeyFile = join(kcfg.TLSConfig.KeyFile) kcfg.TLSConfig.KeyFile = join(kcfg.TLSConfig.KeyFile)
} }
for _, mcfg := range scfg.MarathonSDConfigs {
mcfg.TLSConfig.CAFile = join(mcfg.TLSConfig.CAFile)
mcfg.TLSConfig.CertFile = join(mcfg.TLSConfig.CertFile)
mcfg.TLSConfig.KeyFile = join(mcfg.TLSConfig.KeyFile)
}
} }
} }
@ -769,7 +777,9 @@ func (c *NerveSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
// MarathonSDConfig is the configuration for services running on Marathon. // MarathonSDConfig is the configuration for services running on Marathon.
type MarathonSDConfig struct { type MarathonSDConfig struct {
Servers []string `yaml:"servers,omitempty"` Servers []string `yaml:"servers,omitempty"`
Timeout model.Duration `yaml:"timeout,omitempty"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
// Catches all undefined fields and must be empty after parsing. // Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"` XXX map[string]interface{} `yaml:",inline"`

View file

@ -267,9 +267,14 @@ var expectedConf = &Config{
MarathonSDConfigs: []*MarathonSDConfig{ MarathonSDConfigs: []*MarathonSDConfig{
{ {
Servers: []string{ Servers: []string{
"http://marathon.example.com:8080", "https://marathon.example.com:443",
}, },
Timeout: model.Duration(30 * time.Second),
RefreshInterval: model.Duration(30 * time.Second), RefreshInterval: model.Duration(30 * time.Second),
TLSConfig: TLSConfig{
CertFile: "testdata/valid_cert_file",
KeyFile: "testdata/valid_key_file",
},
}, },
}, },
}, },

View file

@ -132,7 +132,11 @@ scrape_configs:
- job_name: service-marathon - job_name: service-marathon
marathon_sd_configs: marathon_sd_configs:
- servers: - servers:
- 'http://marathon.example.com:8080' - 'https://marathon.example.com:443'
tls_config:
cert_file: valid_cert_file
key_file: valid_key_file
- job_name: service-ec2 - job_name: service-ec2
ec2_sd_configs: ec2_sd_configs:

View file

@ -14,8 +14,6 @@
package discovery package discovery
import ( import (
"time"
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/consul" "github.com/prometheus/prometheus/retrieval/discovery/consul"
@ -35,12 +33,8 @@ func NewKubernetesDiscovery(conf *config.KubernetesSDConfig) (*kubernetes.Kubern
} }
// NewMarathon creates a new Marathon based discovery. // NewMarathon creates a new Marathon based discovery.
func NewMarathon(conf *config.MarathonSDConfig) *marathon.Discovery { func NewMarathon(conf *config.MarathonSDConfig) (*marathon.Discovery, error) {
return &marathon.Discovery{ return marathon.NewDiscovery(conf)
Servers: conf.Servers,
RefreshInterval: time.Duration(conf.RefreshInterval),
Client: marathon.FetchApps,
}
} }
// NewDNS creates a new DNS based discovery. // NewDNS creates a new DNS based discovery.

View file

@ -28,6 +28,7 @@ import (
"github.com/prometheus/common/log" "github.com/prometheus/common/log"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/util/httputil"
) )
const ( const (
@ -71,10 +72,33 @@ const appListPath string = "/v2/apps/?embed=apps.tasks"
// Discovery provides service discovery based on a Marathon instance. // Discovery provides service discovery based on a Marathon instance.
type Discovery struct { type Discovery struct {
Servers []string client *http.Client
RefreshInterval time.Duration servers []string
refreshInterval time.Duration
lastRefresh map[string]*config.TargetGroup lastRefresh map[string]*config.TargetGroup
Client AppListClient appsClient AppListClient
}
// Initialize sets up the discovery for usage.
func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil {
return nil, err
}
client := &http.Client{
Timeout: time.Duration(conf.Timeout),
Transport: &http.Transport{
TLSClientConfig: tls,
},
}
return &Discovery{
client: client,
servers: conf.Servers,
refreshInterval: time.Duration(conf.RefreshInterval),
appsClient: fetchApps,
}, nil
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
@ -85,7 +109,7 @@ func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-time.After(md.RefreshInterval): case <-time.After(md.refreshInterval):
err := md.updateServices(ctx, ch) err := md.updateServices(ctx, ch)
if err != nil { if err != nil {
log.Errorf("Error while updating services: %s", err) log.Errorf("Error while updating services: %s", err)
@ -137,8 +161,8 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar
} }
func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := RandomAppsURL(md.Servers) url := RandomAppsURL(md.servers)
apps, err := md.Client(url) apps, err := md.appsClient(md.client, url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -179,11 +203,11 @@ type AppList struct {
} }
// AppListClient defines a function that can be used to get an application list from marathon. // AppListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(url string) (*AppList, error) type AppListClient func(client *http.Client, url string) (*AppList, error)
// FetchApps requests a list of applications from a marathon server. // fetchApps requests a list of applications from a marathon server.
func FetchApps(url string) (*AppList, error) { func fetchApps(client *http.Client, url string) (*AppList, error) {
resp, err := http.Get(url) resp, err := client.Get(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -15,6 +15,7 @@ package marathon
import ( import (
"errors" "errors"
"net/http"
"testing" "testing"
"time" "time"
@ -27,13 +28,15 @@ import (
var ( var (
marathonValidLabel = map[string]string{"prometheus": "yes"} marathonValidLabel = map[string]string{"prometheus": "yes"}
testServers = []string{"http://localhost:8080"} testServers = []string{"http://localhost:8080"}
conf = config.MarathonSDConfig{Servers: testServers}
) )
func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error { func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error {
md := Discovery{ md, err := NewDiscovery(&conf)
Servers: testServers, if err != nil {
Client: client, return err
} }
md.appsClient = client
return md.updateServices(context.Background(), ch) return md.updateServices(context.Background(), ch)
} }
@ -41,7 +44,7 @@ func TestMarathonSDHandleError(t *testing.T) {
var ( var (
errTesting = errors.New("testing failure") errTesting = errors.New("testing failure")
ch = make(chan []*config.TargetGroup, 1) ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) { return nil, errTesting } client = func(client *http.Client, url string) (*AppList, error) { return nil, errTesting }
) )
if err := testUpdateServices(client, ch); err != errTesting { if err := testUpdateServices(client, ch); err != errTesting {
t.Fatalf("Expected error: %s", err) t.Fatalf("Expected error: %s", err)
@ -56,7 +59,7 @@ func TestMarathonSDHandleError(t *testing.T) {
func TestMarathonSDEmptyList(t *testing.T) { func TestMarathonSDEmptyList(t *testing.T) {
var ( var (
ch = make(chan []*config.TargetGroup, 1) ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) { return &AppList{}, nil } client = func(client *http.Client, url string) (*AppList, error) { return &AppList{}, nil }
) )
if err := testUpdateServices(client, ch); err != nil { if err := testUpdateServices(client, ch); err != nil {
t.Fatalf("Got error: %s", err) t.Fatalf("Got error: %s", err)
@ -95,7 +98,7 @@ func marathonTestAppList(labels map[string]string, runningTasks int) *AppList {
func TestMarathonSDSendGroup(t *testing.T) { func TestMarathonSDSendGroup(t *testing.T) {
var ( var (
ch = make(chan []*config.TargetGroup, 1) ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) { client = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
} }
) )
@ -122,16 +125,14 @@ func TestMarathonSDSendGroup(t *testing.T) {
} }
func TestMarathonSDRemoveApp(t *testing.T) { func TestMarathonSDRemoveApp(t *testing.T) {
var ( var ch = make(chan []*config.TargetGroup)
ch = make(chan []*config.TargetGroup) md, err := NewDiscovery(&conf)
client = func(url string) (*AppList, error) { if err != nil {
return marathonTestAppList(marathonValidLabel, 1), nil t.Fatalf("%s", err)
} }
md = Discovery{ md.appsClient = func(client *http.Client, url string) (*AppList, error) {
Servers: testServers, return marathonTestAppList(marathonValidLabel, 1), nil
Client: client, }
}
)
go func() { go func() {
up1 := (<-ch)[0] up1 := (<-ch)[0]
up2 := (<-ch)[0] up2 := (<-ch)[0]
@ -142,32 +143,31 @@ func TestMarathonSDRemoveApp(t *testing.T) {
} }
} }
}() }()
err := md.updateServices(context.Background(), ch) if err := md.updateServices(context.Background(), ch); err != nil {
if err != nil {
t.Fatalf("Got error on first update: %s", err) t.Fatalf("Got error on first update: %s", err)
} }
md.Client = func(url string) (*AppList, error) { md.appsClient = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil return marathonTestAppList(marathonValidLabel, 0), nil
} }
err = md.updateServices(context.Background(), ch) if err := md.updateServices(context.Background(), ch); err != nil {
if err != nil {
t.Fatalf("Got error on second update: %s", err) t.Fatalf("Got error on second update: %s", err)
} }
} }
func TestMarathonSDRunAndStop(t *testing.T) { func TestMarathonSDRunAndStop(t *testing.T) {
var ( var (
ch = make(chan []*config.TargetGroup) refreshInterval = model.Duration(time.Millisecond * 10)
client = func(url string) (*AppList, error) { conf = config.MarathonSDConfig{Servers: testServers, RefreshInterval: refreshInterval}
return marathonTestAppList(marathonValidLabel, 1), nil ch = make(chan []*config.TargetGroup)
}
md = Discovery{
Servers: testServers,
Client: client,
RefreshInterval: time.Millisecond * 10,
}
) )
md, err := NewDiscovery(&conf)
if err != nil {
t.Fatalf("%s", err)
}
md.appsClient = func(client *http.Client, url string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
@ -178,7 +178,7 @@ func TestMarathonSDRunAndStop(t *testing.T) {
return return
} }
cancel() cancel()
case <-time.After(md.RefreshInterval * 3): case <-time.After(md.refreshInterval * 3):
cancel() cancel()
t.Fatalf("Update took too long.") t.Fatalf("Update took too long.")
} }
@ -213,7 +213,7 @@ func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int)
func TestMarathonZeroTaskPorts(t *testing.T) { func TestMarathonZeroTaskPorts(t *testing.T) {
var ( var (
ch = make(chan []*config.TargetGroup, 1) ch = make(chan []*config.TargetGroup, 1)
client = func(url string) (*AppList, error) { client = func(client *http.Client, url string) (*AppList, error) {
return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil return marathonTestZeroTaskPortAppList(marathonValidLabel, 1), nil
} }
) )

View file

@ -389,7 +389,12 @@ func providersFromConfig(cfg *config.ScrapeConfig) map[string]TargetProvider {
app("consul", i, k) app("consul", i, k)
} }
for i, c := range cfg.MarathonSDConfigs { for i, c := range cfg.MarathonSDConfigs {
app("marathon", i, discovery.NewMarathon(c)) m, err := discovery.NewMarathon(c)
if err != nil {
log.Errorf("Cannot create Marathon discovery: %s", err)
continue
}
app("marathon", i, m)
} }
for i, c := range cfg.KubernetesSDConfigs { for i, c := range cfg.KubernetesSDConfigs {
k, err := discovery.NewKubernetesDiscovery(c) k, err := discovery.NewKubernetesDiscovery(c)