mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-27 22:49:40 -08:00
2aba238f31
This adds support for basic authentication which closes #3090 The support for specifying the client timeout was removed as discussed in https://github.com/prometheus/common/pull/123. Marathon was the only sd mechanism doing this and configuring the timeout is done through `Context`. DC/OS uses a custom `Authorization` header for authenticating. This adds 2 new configuration properties to reflect this. Existing configuration files that use the bearer token will no longer work. More work is required to make this backwards compatible.
442 lines
14 KiB
Go
442 lines
14 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package marathon
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
config_util "github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/util/httputil"
|
|
"github.com/prometheus/prometheus/util/strutil"
|
|
)
|
|
|
|
const (
|
|
// metaLabelPrefix is the meta prefix used for all meta labels in this discovery.
|
|
metaLabelPrefix = model.MetaLabelPrefix + "marathon_"
|
|
// appLabelPrefix is the prefix for the application labels.
|
|
appLabelPrefix = metaLabelPrefix + "app_label_"
|
|
|
|
// appLabel is used for the name of the app in Marathon.
|
|
appLabel model.LabelName = metaLabelPrefix + "app"
|
|
// imageLabel is the label that is used for the docker image running the service.
|
|
imageLabel model.LabelName = metaLabelPrefix + "image"
|
|
// portIndexLabel is the integer port index when multiple ports are defined;
|
|
// e.g. PORT1 would have a value of '1'
|
|
portIndexLabel model.LabelName = metaLabelPrefix + "port_index"
|
|
// taskLabel contains the mesos task name of the app instance.
|
|
taskLabel model.LabelName = metaLabelPrefix + "task"
|
|
|
|
// portMappingLabelPrefix is the prefix for the application portMappings labels.
|
|
portMappingLabelPrefix = metaLabelPrefix + "port_mapping_label_"
|
|
// portDefinitionLabelPrefix is the prefix for the application portDefinitions labels.
|
|
portDefinitionLabelPrefix = metaLabelPrefix + "port_definition_label_"
|
|
|
|
// Constants for instrumentation.
|
|
namespace = "prometheus"
|
|
)
|
|
|
|
var (
|
|
refreshFailuresCount = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Namespace: namespace,
|
|
Name: "sd_marathon_refresh_failures_total",
|
|
Help: "The number of Marathon-SD refresh failures.",
|
|
})
|
|
refreshDuration = prometheus.NewSummary(
|
|
prometheus.SummaryOpts{
|
|
Namespace: namespace,
|
|
Name: "sd_marathon_refresh_duration_seconds",
|
|
Help: "The duration of a Marathon-SD refresh in seconds.",
|
|
})
|
|
// DefaultSDConfig is the default Marathon SD configuration.
|
|
DefaultSDConfig = SDConfig{
|
|
RefreshInterval: model.Duration(30 * time.Second),
|
|
}
|
|
)
|
|
|
|
// SDConfig is the configuration for services running on Marathon.
|
|
type SDConfig struct {
|
|
Servers []string `yaml:"servers,omitempty"`
|
|
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
|
|
AuthToken config_util.Secret `yaml:"auth_token,omitempty"`
|
|
AuthTokenFile string `yaml:"auth_token_file,omitempty"`
|
|
HTTPClientConfig config_util.HTTPClientConfig `yaml:",inline"`
|
|
}
|
|
|
|
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
|
func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|
*c = DefaultSDConfig
|
|
type plain SDConfig
|
|
err := unmarshal((*plain)(c))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(c.Servers) == 0 {
|
|
return fmt.Errorf("marathon_sd: must contain at least one Marathon server")
|
|
}
|
|
if len(c.AuthToken) > 0 && len(c.AuthTokenFile) > 0 {
|
|
return fmt.Errorf("marathon_sd: at most one of auth_token & auth_token_file must be configured")
|
|
}
|
|
if c.HTTPClientConfig.BasicAuth != nil && (len(c.AuthToken) > 0 || len(c.AuthTokenFile) > 0) {
|
|
return fmt.Errorf("marathon_sd: at most one of basic_auth, auth_token & auth_token_file must be configured")
|
|
}
|
|
if (len(c.HTTPClientConfig.BearerToken) > 0 || len(c.HTTPClientConfig.BearerTokenFile) > 0) && (len(c.AuthToken) > 0 || len(c.AuthTokenFile) > 0) {
|
|
return fmt.Errorf("marathon_sd: at most one of bearer_token, bearer_token_file, auth_token & auth_token_file must be configured")
|
|
}
|
|
if err := c.HTTPClientConfig.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func init() {
|
|
prometheus.MustRegister(refreshFailuresCount)
|
|
prometheus.MustRegister(refreshDuration)
|
|
}
|
|
|
|
const appListPath string = "/v2/apps/?embed=apps.tasks"
|
|
|
|
// Discovery provides service discovery based on a Marathon instance.
|
|
type Discovery struct {
|
|
client *http.Client
|
|
servers []string
|
|
refreshInterval time.Duration
|
|
lastRefresh map[string]*targetgroup.Group
|
|
appsClient AppListClient
|
|
logger log.Logger
|
|
}
|
|
|
|
// NewDiscovery returns a new Marathon Discovery.
|
|
func NewDiscovery(conf SDConfig, logger log.Logger) (*Discovery, error) {
|
|
if logger == nil {
|
|
logger = log.NewNopLogger()
|
|
}
|
|
|
|
rt, err := httputil.NewRoundTripperFromConfig(conf.HTTPClientConfig, "marathon_sd")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(conf.AuthToken) > 0 {
|
|
rt, err = newAuthTokenRoundTripper(conf.AuthToken, rt)
|
|
} else if len(conf.AuthTokenFile) > 0 {
|
|
rt, err = newAuthTokenFileRoundTripper(conf.AuthTokenFile, rt)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Discovery{
|
|
client: &http.Client{Transport: rt},
|
|
servers: conf.Servers,
|
|
refreshInterval: time.Duration(conf.RefreshInterval),
|
|
appsClient: fetchApps,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
type authTokenRoundTripper struct {
|
|
authToken config_util.Secret
|
|
rt http.RoundTripper
|
|
}
|
|
|
|
// newAuthTokenRoundTripper adds the provided auth token to a request.
|
|
func newAuthTokenRoundTripper(token config_util.Secret, rt http.RoundTripper) (http.RoundTripper, error) {
|
|
return &authTokenRoundTripper{token, rt}, nil
|
|
}
|
|
|
|
func (rt *authTokenRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
|
|
// According to https://docs.mesosphere.com/1.11/security/oss/managing-authentication/
|
|
// DC/OS wants with "token=" a different Authorization header than implemented in httputil/client.go
|
|
// so we set this explicitly here.
|
|
request.Header.Set("Authorization", "token="+string(rt.authToken))
|
|
|
|
return rt.rt.RoundTrip(request)
|
|
}
|
|
|
|
type authTokenFileRoundTripper struct {
|
|
authTokenFile string
|
|
rt http.RoundTripper
|
|
}
|
|
|
|
// newAuthTokenFileRoundTripper adds the auth token read from the file to a request.
|
|
func newAuthTokenFileRoundTripper(tokenFile string, rt http.RoundTripper) (http.RoundTripper, error) {
|
|
// fail-fast if we can't read the file.
|
|
_, err := ioutil.ReadFile(tokenFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to read auth token file %s: %s", tokenFile, err)
|
|
}
|
|
return &authTokenFileRoundTripper{tokenFile, rt}, nil
|
|
}
|
|
|
|
func (rt *authTokenFileRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
|
|
b, err := ioutil.ReadFile(rt.authTokenFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to read auth token file %s: %s", rt.authTokenFile, err)
|
|
}
|
|
authToken := strings.TrimSpace(string(b))
|
|
|
|
// According to https://docs.mesosphere.com/1.11/security/oss/managing-authentication/
|
|
// DC/OS wants with "token=" a different Authorization header than implemented in httputil/client.go
|
|
// so we set this explicitly here.
|
|
request.Header.Set("Authorization", "token="+authToken)
|
|
return rt.rt.RoundTrip(request)
|
|
}
|
|
|
|
// Run implements the Discoverer interface.
|
|
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(d.refreshInterval):
|
|
err := d.updateServices(ctx, ch)
|
|
if err != nil {
|
|
level.Error(d.logger).Log("msg", "Error while updating services", "err", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*targetgroup.Group) (err error) {
|
|
t0 := time.Now()
|
|
defer func() {
|
|
refreshDuration.Observe(time.Since(t0).Seconds())
|
|
if err != nil {
|
|
refreshFailuresCount.Inc()
|
|
}
|
|
}()
|
|
|
|
targetMap, err := d.fetchTargetGroups()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
all := make([]*targetgroup.Group, 0, len(targetMap))
|
|
for _, tg := range targetMap {
|
|
all = append(all, tg)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case ch <- all:
|
|
}
|
|
|
|
// Remove services which did disappear.
|
|
for source := range d.lastRefresh {
|
|
_, ok := targetMap[source]
|
|
if !ok {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case ch <- []*targetgroup.Group{{Source: source}}:
|
|
level.Debug(d.logger).Log("msg", "Removing group", "source", source)
|
|
}
|
|
}
|
|
}
|
|
|
|
d.lastRefresh = targetMap
|
|
return nil
|
|
}
|
|
|
|
func (d *Discovery) fetchTargetGroups() (map[string]*targetgroup.Group, error) {
|
|
url := RandomAppsURL(d.servers)
|
|
apps, err := d.appsClient(d.client, url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
groups := AppsToTargetGroups(apps)
|
|
return groups, nil
|
|
}
|
|
|
|
// Task describes one instance of a service running on Marathon.
|
|
type Task struct {
|
|
ID string `json:"id"`
|
|
Host string `json:"host"`
|
|
Ports []uint32 `json:"ports"`
|
|
}
|
|
|
|
// PortMappings describes in which port the process are binding inside the docker container.
|
|
type PortMappings struct {
|
|
Labels map[string]string `json:"labels"`
|
|
}
|
|
|
|
// DockerContainer describes a container which uses the docker runtime.
|
|
type DockerContainer struct {
|
|
Image string `json:"image"`
|
|
PortMappings []PortMappings `json:"portMappings"`
|
|
}
|
|
|
|
// Container describes the runtime an app in running in.
|
|
type Container struct {
|
|
Docker DockerContainer `json:"docker"`
|
|
PortMappings []PortMappings `json:"portMappings"`
|
|
}
|
|
|
|
// PortDefinitions describes which load balancer port you should access to access the service.
|
|
type PortDefinitions struct {
|
|
Labels map[string]string `json:"labels"`
|
|
}
|
|
|
|
// App describes a service running on Marathon.
|
|
type App struct {
|
|
ID string `json:"id"`
|
|
Tasks []Task `json:"tasks"`
|
|
RunningTasks int `json:"tasksRunning"`
|
|
Labels map[string]string `json:"labels"`
|
|
Container Container `json:"container"`
|
|
PortDefinitions []PortDefinitions `json:"portDefinitions"`
|
|
}
|
|
|
|
// AppList is a list of Marathon apps.
|
|
type AppList struct {
|
|
Apps []App `json:"apps"`
|
|
}
|
|
|
|
// AppListClient defines a function that can be used to get an application list from marathon.
|
|
type AppListClient func(client *http.Client, url string) (*AppList, error)
|
|
|
|
// fetchApps requests a list of applications from a marathon server.
|
|
func fetchApps(client *http.Client, url string) (*AppList, error) {
|
|
request, err := http.NewRequest("GET", url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := client.Do(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return parseAppJSON(body)
|
|
}
|
|
|
|
func parseAppJSON(body []byte) (*AppList, error) {
|
|
apps := &AppList{}
|
|
err := json.Unmarshal(body, apps)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return apps, nil
|
|
}
|
|
|
|
// RandomAppsURL randomly selects a server from an array and creates
|
|
// an URL pointing to the app list.
|
|
func RandomAppsURL(servers []string) string {
|
|
// TODO: If possible update server list from Marathon at some point.
|
|
server := servers[rand.Intn(len(servers))]
|
|
return fmt.Sprintf("%s%s", server, appListPath)
|
|
}
|
|
|
|
// AppsToTargetGroups takes an array of Marathon apps and converts them into target groups.
|
|
func AppsToTargetGroups(apps *AppList) map[string]*targetgroup.Group {
|
|
tgroups := map[string]*targetgroup.Group{}
|
|
for _, a := range apps.Apps {
|
|
group := createTargetGroup(&a)
|
|
tgroups[group.Source] = group
|
|
}
|
|
return tgroups
|
|
}
|
|
|
|
func createTargetGroup(app *App) *targetgroup.Group {
|
|
var (
|
|
targets = targetsForApp(app)
|
|
appName = model.LabelValue(app.ID)
|
|
image = model.LabelValue(app.Container.Docker.Image)
|
|
)
|
|
tg := &targetgroup.Group{
|
|
Targets: targets,
|
|
Labels: model.LabelSet{
|
|
appLabel: appName,
|
|
imageLabel: image,
|
|
},
|
|
Source: app.ID,
|
|
}
|
|
|
|
for ln, lv := range app.Labels {
|
|
ln = appLabelPrefix + strutil.SanitizeLabelName(ln)
|
|
tg.Labels[model.LabelName(ln)] = model.LabelValue(lv)
|
|
}
|
|
|
|
return tg
|
|
}
|
|
|
|
func targetsForApp(app *App) []model.LabelSet {
|
|
targets := make([]model.LabelSet, 0, len(app.Tasks))
|
|
for _, t := range app.Tasks {
|
|
if len(t.Ports) == 0 {
|
|
continue
|
|
}
|
|
for i := 0; i < len(t.Ports); i++ {
|
|
targetAddress := targetForTask(&t, i)
|
|
target := model.LabelSet{
|
|
model.AddressLabel: model.LabelValue(targetAddress),
|
|
taskLabel: model.LabelValue(t.ID),
|
|
portIndexLabel: model.LabelValue(strconv.Itoa(i)),
|
|
}
|
|
if i < len(app.PortDefinitions) {
|
|
for ln, lv := range app.PortDefinitions[i].Labels {
|
|
ln = portDefinitionLabelPrefix + strutil.SanitizeLabelName(ln)
|
|
target[model.LabelName(ln)] = model.LabelValue(lv)
|
|
}
|
|
}
|
|
// Prior to Marathon 1.5 the port mappings could be found at the path
|
|
// "container.docker.portMappings". When support for Marathon 1.4
|
|
// is dropped then this section of code can be removed.
|
|
if i < len(app.Container.Docker.PortMappings) {
|
|
for ln, lv := range app.Container.Docker.PortMappings[i].Labels {
|
|
ln = portMappingLabelPrefix + strutil.SanitizeLabelName(ln)
|
|
target[model.LabelName(ln)] = model.LabelValue(lv)
|
|
}
|
|
}
|
|
// In Marathon 1.5.x the container.docker.portMappings object was moved
|
|
// to container.portMappings.
|
|
if i < len(app.Container.PortMappings) {
|
|
for ln, lv := range app.Container.PortMappings[i].Labels {
|
|
ln = portMappingLabelPrefix + strutil.SanitizeLabelName(ln)
|
|
target[model.LabelName(ln)] = model.LabelValue(lv)
|
|
}
|
|
}
|
|
targets = append(targets, target)
|
|
}
|
|
}
|
|
return targets
|
|
}
|
|
|
|
func targetForTask(task *Task, index int) string {
|
|
return net.JoinHostPort(task.Host, fmt.Sprintf("%d", task.Ports[index]))
|
|
}
|