Add service discovery using Marathon API.

This commit is contained in:
Robert Jacob 2015-07-16 14:02:07 +02:00
parent a59b7ac7f8
commit 4d0f974c42
10 changed files with 508 additions and 1 deletions

3
.dockerignore Normal file
View file

@ -0,0 +1,3 @@
data/
prometheus
promtool

View file

@ -97,6 +97,11 @@ var (
DefaultServersetSDConfig = ServersetSDConfig{ DefaultServersetSDConfig = ServersetSDConfig{
Timeout: Duration(10 * time.Second), Timeout: Duration(10 * time.Second),
} }
// DefaultMarathonSDConfig is the default Marathon SD configuration.
DefaultMarathonSDConfig = MarathonSDConfig{
RefreshInterval: Duration(30 * time.Second),
}
) )
// Config is the top-level configuration for Prometheus's config files. // Config is the top-level configuration for Prometheus's config files.
@ -239,6 +244,8 @@ type ScrapeConfig struct {
ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"` ConsulSDConfigs []*ConsulSDConfig `yaml:"consul_sd_configs,omitempty"`
// List of Serverset service discovery configurations. // List of Serverset service discovery configurations.
ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"` ServersetSDConfigs []*ServersetSDConfig `yaml:"serverset_sd_configs,omitempty"`
// MarathonSDConfigs is a list of Marathon service discovery configurations.
MarathonSDConfigs []*MarathonSDConfig `yaml:"marathon_sd_configs,omitempty"`
// List of target relabel configurations. // List of target relabel configurations.
RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"` RelabelConfigs []*RelabelConfig `yaml:"relabel_configs,omitempty"`
@ -474,6 +481,29 @@ func (c *ServersetSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
return checkOverflow(c.XXX, "serverset_sd_config") return checkOverflow(c.XXX, "serverset_sd_config")
} }
// MarathonSDConfig is the configuration for services running on Marathon.
type MarathonSDConfig struct {
Servers []string `yaml:"servers,omitempty"`
RefreshInterval Duration `yaml:"refresh_interval,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *MarathonSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultMarathonSDConfig
type plain MarathonSDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if len(c.Servers) == 0 {
return fmt.Errorf("Marathon SD config must contain at least one Marathon server")
}
return checkOverflow(c.XXX, "marathon_sd_config")
}
// RelabelAction is the action to be performed on relabeling. // RelabelAction is the action to be performed on relabeling.
type RelabelAction string type RelabelAction string
@ -510,7 +540,7 @@ type RelabelConfig struct {
// Separator is the string between concatenated values from the source labels. // Separator is the string between concatenated values from the source labels.
Separator string `yaml:"separator,omitempty"` Separator string `yaml:"separator,omitempty"`
// Regex against which the concatenation is matched. // Regex against which the concatenation is matched.
Regex *Regexp `yaml:"regex",omitempty` Regex *Regexp `yaml:"regex,omitempty"`
// Modulus to take of the hash of concatenated values from the source labels. // Modulus to take of the hash of concatenated values from the source labels.
Modulus uint64 `yaml:"modulus,omitempty"` Modulus uint64 `yaml:"modulus,omitempty"`
// The label to which the resulting string is written in a replacement. // The label to which the resulting string is written in a replacement.

View file

@ -0,0 +1,98 @@
package discovery
import (
"time"
"github.com/prometheus/log"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
// MarathonDiscovery provides service discovery based on a Marathon instance.
type MarathonDiscovery struct {
servers []string
refreshInterval time.Duration
done chan struct{}
lastRefresh map[string]*config.TargetGroup
client marathon.AppListClient
}
// NewMarathonDiscovery creates a new Marathon based discovery.
func NewMarathonDiscovery(conf *config.MarathonSDConfig) *MarathonDiscovery {
return &MarathonDiscovery{
servers: conf.Servers,
refreshInterval: time.Duration(conf.RefreshInterval),
done: make(chan struct{}),
client: marathon.FetchMarathonApps,
}
}
// Sources implements the TargetProvider interface.
func (md *MarathonDiscovery) Sources() []string {
var sources []string
tgroups, err := md.fetchTargetGroups()
if err == nil {
for source := range tgroups {
sources = append(sources, source)
}
}
return sources
}
// Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup) {
defer close(ch)
for {
select {
case <-md.done:
log.Debug("Shutting down marathon discovery.")
return
case <-time.After(md.refreshInterval):
err := md.updateServices(ch)
if err != nil {
log.Errorf("Error while updating services: %s", err)
}
}
}
}
// Stop implements the TargetProvider interface.
func (md *MarathonDiscovery) Stop() {
md.done <- struct{}{}
}
func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups()
if err != nil {
return err
}
// Update services which are still present
for _, tg := range targetMap {
ch <- tg
}
// Remove services which did disappear
for source := range md.lastRefresh {
_, ok := targetMap[source]
if !ok {
log.Debugf("Removing group for %s", source)
ch <- &config.TargetGroup{Source: source}
}
}
md.lastRefresh = targetMap
return nil
}
func (md *MarathonDiscovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := marathon.RandomAppsURL(md.servers)
apps, err := md.client(url)
if err != nil {
return nil, err
}
groups := marathon.AppsToTargetGroups(apps)
return groups, nil
}

View file

@ -0,0 +1,34 @@
package marathon
import (
"encoding/json"
"io/ioutil"
"net/http"
)
// AppListClient defines a function that can be used to get an application list from marathon.
type AppListClient func(url string) (*AppList, error)
// FetchMarathonApps requests a list of applications from a marathon server.
func FetchMarathonApps(url string) (*AppList, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return parseAppJSON(body)
}
func parseAppJSON(body []byte) (*AppList, error) {
apps := &AppList{}
err := json.Unmarshal(body, apps)
if err != nil {
return nil, err
}
return apps, nil
}

View file

@ -0,0 +1,16 @@
package marathon
import (
"github.com/prometheus/client_golang/model"
)
const (
// AppLabel is used for the name of the app in Marathon.
AppLabel model.LabelName = "__meta_marathon_app"
// ImageLabel is the label that is used for the docker image running the service.
ImageLabel model.LabelName = "__meta_marathon_image"
// TaskLabel contains the mesos task name of the app instance.
TaskLabel model.LabelName = "__meta_marathon_task"
)

View file

@ -0,0 +1,76 @@
package marathon
import (
"fmt"
"strings"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
)
// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups.
func AppsToTargetGroups(apps *AppList) map[string]*config.TargetGroup {
tgroups := map[string]*config.TargetGroup{}
for _, a := range apps.Apps {
if isValidApp(&a) {
group := createTargetGroup(&a)
tgroups[group.Source] = group
}
}
return tgroups
}
func createTargetGroup(app *App) *config.TargetGroup {
var (
targets = targetsForApp(app)
source = targetGroupName(app)
appName = clientmodel.LabelValue(sanitizeName(app.ID))
image = clientmodel.LabelValue(imageName(app))
)
return &config.TargetGroup{
Targets: targets,
Labels: clientmodel.LabelSet{
AppLabel: appName,
ImageLabel: image,
},
Source: source,
}
}
func targetsForApp(app *App) []clientmodel.LabelSet {
targets := make([]clientmodel.LabelSet, 0, len(app.Tasks))
for _, t := range app.Tasks {
target := targetForTask(&t)
targets = append(targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(target),
TaskLabel: clientmodel.LabelValue(sanitizeName(t.ID)),
})
}
return targets
}
func imageName(app *App) string {
return app.Container.Docker.Image
}
func targetForTask(task *Task) string {
return fmt.Sprintf("%s:%d", task.Host, task.Ports[0])
}
func isValidApp(app *App) bool {
if app.RunningTasks > 0 {
_, ok := app.Labels["prometheus"]
return ok
}
return false
}
func targetGroupName(app *App) string {
return fmt.Sprintf("marathon:%s", sanitizeName(app.ID))
}
func sanitizeName(id string) string {
trimID := strings.TrimLeft(id, " -/.")
return strings.Replace(trimID, "/", "-", -1)
}

View file

@ -0,0 +1,32 @@
package marathon
// Task describes one instance of a service running on Marathon.
type Task struct {
ID string `json:"id"`
Host string `json:"host"`
Ports []uint32 `json:"ports"`
}
// DockerContainer describes a container which uses the docker runtime.
type DockerContainer struct {
Image string `json:"image"`
}
// Container describes the runtime an app in running in.
type Container struct {
Docker DockerContainer `json:"docker"`
}
// App describes a service running on Marathon.
type App struct {
ID string `json:"id"`
Tasks []Task `json:"tasks"`
RunningTasks int `json:"tasksRunning"`
Labels map[string]string `json:"labels"`
Container Container `json:"container"`
}
// AppList is a list of Marathon apps.
type AppList struct {
Apps []App `json:"apps"`
}

View file

@ -0,0 +1,15 @@
package marathon
import (
"fmt"
"math/rand"
)
const appListPath string = "/v2/apps/?embed=apps.tasks"
// RandomAppsURL randomly selects a server from an array and creates an URL pointing to the app list.
func RandomAppsURL(servers []string) string {
// TODO If possible update server list from Marathon at some point
server := servers[rand.Intn(len(servers))]
return fmt.Sprintf("%s%s", server, appListPath)
}

View file

@ -0,0 +1,200 @@
package discovery
import (
"errors"
"testing"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/retrieval/discovery/marathon"
)
var marathonValidLabel = map[string]string{"prometheus": "yes"}
func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) {
ch := make(chan *config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"},
})
md.client = client
return ch, md
}
func TestMarathonSDHandleError(t *testing.T) {
var errTesting = errors.New("testing failure")
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return nil, errTesting
})
go func() {
select {
case tg := <-ch:
t.Fatalf("Got group: %s", tg)
default:
}
}()
err := md.updateServices(ch)
if err != errTesting {
t.Fatalf("Expected error: %s", err)
}
}
func TestMarathonSDEmptyList(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return &marathon.AppList{}, nil
})
go func() {
select {
case tg := <-ch:
t.Fatalf("Got group: %v", tg)
default:
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error: %s", err)
}
}
func marathonTestAppList(labels map[string]string, runningTasks int) *marathon.AppList {
task := marathon.Task{
ID: "test-task-1",
Host: "mesos-slave1",
Ports: []uint32{31000},
}
docker := marathon.DockerContainer{Image: "repo/image:tag"}
container := marathon.Container{Docker: docker}
app := marathon.App{
ID: "test-service",
Tasks: []marathon.Task{task},
RunningTasks: runningTasks,
Labels: labels,
Container: container,
}
return &marathon.AppList{
Apps: []marathon.App{app},
}
}
func TestMarathonSDSendGroup(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
go func() {
select {
case tg := <-ch:
if tg.Source != "marathon:test-service" {
t.Fatalf("Wrong target group name: %s", tg.Source)
}
if len(tg.Targets) != 1 {
t.Fatalf("Wrong number of targets: %v", tg.Targets)
}
tgt := tg.Targets[0]
if tgt[clientmodel.AddressLabel] != "mesos-slave1:31000" {
t.Fatalf("Wrong target address: %s", tgt[clientmodel.AddressLabel])
}
default:
t.Fatal("Did not get a target group.")
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error: %s", err)
}
}
func TestMarathonSDNoLabel(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(map[string]string{}, 1), nil
})
go func() {
select {
case tg := <-ch:
t.Fatalf("Got group: %s", tg)
default:
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error: %s", err)
}
}
func TestMarathonSDNotRunning(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil
})
go func() {
select {
case tg := <-ch:
t.Fatalf("Got group: %s", tg)
default:
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error: %s", err)
}
}
func TestMarathonSDRemoveApp(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
go func() {
up1 := <-ch
up2 := <-ch
if up2.Source != up1.Source {
t.Fatalf("Source is different: %s", up2)
if len(up2.Targets) > 0 {
t.Fatalf("Got a non-empty target set: %s", up2.Targets)
}
}
}()
err := md.updateServices(ch)
if err != nil {
t.Fatalf("Got error on first update: %s", err)
}
md.client = func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil
}
err = md.updateServices(ch)
if err != nil {
t.Fatalf("Got error on second update: %s", err)
}
}
func TestMarathonSDSources(t *testing.T) {
_, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
sources := md.Sources()
if len(sources) != 1 {
t.Fatalf("Wrong number of sources: %s", sources)
}
}
func TestMarathonSDRunAndStop(t *testing.T) {
ch, md := newTestDiscovery(func(url string) (*marathon.AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil
})
md.refreshInterval = time.Millisecond * 10
go func() {
select {
case <-ch:
md.Stop()
case <-time.After(md.refreshInterval * 3):
md.Stop()
t.Fatalf("Update took too long.")
}
}()
md.Run(ch)
select {
case <-ch:
default:
t.Fatalf("Channel not closed.")
}
}

View file

@ -392,6 +392,9 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
for _, c := range cfg.ServersetSDConfigs { for _, c := range cfg.ServersetSDConfigs {
providers = append(providers, discovery.NewServersetDiscovery(c)) providers = append(providers, discovery.NewServersetDiscovery(c))
} }
for _, c := range cfg.MarathonSDConfigs {
providers = append(providers, discovery.NewMarathonDiscovery(c))
}
if len(cfg.TargetGroups) > 0 { if len(cfg.TargetGroups) > 0 {
providers = append(providers, NewStaticProvider(cfg.TargetGroups)) providers = append(providers, NewStaticProvider(cfg.TargetGroups))
} }