prometheus/discovery/nomad/nomad.go
Paulin Todev 78411d5e8b
SD Managers taking over responsibility for registration of debug metrics (#13375)
SD Managers take over responsibility for SD metrics registration

---------

Signed-off-by: Paulin Todev <paulin.todev@gmail.com>
Signed-off-by: Björn Rabenstein <github@rabenste.in>
Co-authored-by: Björn Rabenstein <github@rabenste.in>
2024-01-23 16:53:55 +01:00

217 lines
6.6 KiB
Go

// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nomad
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/go-kit/log"
nomad "github.com/hashicorp/nomad/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
const (
// nomadLabel is the name for the label containing a target.
nomadLabel = model.MetaLabelPrefix + "nomad_"
// serviceLabel is the name of the label containing the service name.
nomadAddress = nomadLabel + "address"
nomadService = nomadLabel + "service"
nomadNamespace = nomadLabel + "namespace"
nomadNodeID = nomadLabel + "node_id"
nomadDatacenter = nomadLabel + "dc"
nomadServiceAddress = nomadService + "_address"
nomadServicePort = nomadService + "_port"
nomadServiceID = nomadService + "_id"
nomadTags = nomadLabel + "tags"
)
// DefaultSDConfig is the default nomad SD configuration.
var DefaultSDConfig = SDConfig{
AllowStale: true,
HTTPClientConfig: config.DefaultHTTPClientConfig,
Namespace: "default",
RefreshInterval: model.Duration(60 * time.Second),
Region: "global",
Server: "http://localhost:4646",
TagSeparator: ",",
}
func init() {
discovery.RegisterConfig(&SDConfig{})
}
// SDConfig is the configuration for nomad based service discovery.
type SDConfig struct {
AllowStale bool `yaml:"allow_stale"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
Namespace string `yaml:"namespace"`
RefreshInterval model.Duration `yaml:"refresh_interval"`
Region string `yaml:"region"`
Server string `yaml:"server"`
TagSeparator string `yaml:"tag_separator,omitempty"`
}
// NewDiscovererMetrics implements discovery.Config.
func (*SDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
return newDiscovererMetrics(reg, rmi)
}
// Name returns the name of the Config.
func (*SDConfig) Name() string { return "nomad" }
// NewDiscoverer returns a Discoverer for the Config.
func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
return NewDiscovery(c, opts.Logger, opts.Metrics)
}
// SetDirectory joins any relative file paths with dir.
func (c *SDConfig) SetDirectory(dir string) {
c.HTTPClientConfig.SetDirectory(dir)
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultSDConfig
type plain SDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
if strings.TrimSpace(c.Server) == "" {
return errors.New("nomad SD configuration requires a server address")
}
return c.HTTPClientConfig.Validate()
}
// Discovery periodically performs nomad requests. It implements
// the Discoverer interface.
type Discovery struct {
*refresh.Discovery
allowStale bool
client *nomad.Client
namespace string
refreshInterval time.Duration
region string
server string
tagSeparator string
metrics *nomadMetrics
}
// NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
m, ok := metrics.(*nomadMetrics)
if !ok {
return nil, fmt.Errorf("invalid discovery metrics type")
}
d := &Discovery{
allowStale: conf.AllowStale,
namespace: conf.Namespace,
refreshInterval: time.Duration(conf.RefreshInterval),
region: conf.Region,
server: conf.Server,
tagSeparator: conf.TagSeparator,
metrics: m,
}
HTTPClient, err := config.NewClientFromConfig(conf.HTTPClientConfig, "nomad_sd")
if err != nil {
return nil, err
}
config := nomad.Config{
Address: conf.Server,
HttpClient: HTTPClient,
Namespace: conf.Namespace,
Region: conf.Region,
}
client, err := nomad.NewClient(&config)
if err != nil {
return nil, err
}
d.client = client
d.Discovery = refresh.NewDiscovery(
refresh.Options{
Logger: logger,
Mech: "nomad",
Interval: time.Duration(conf.RefreshInterval),
RefreshF: d.refresh,
MetricsInstantiator: m.refreshMetrics,
},
)
return d, nil
}
func (d *Discovery) refresh(context.Context) ([]*targetgroup.Group, error) {
opts := &nomad.QueryOptions{
AllowStale: d.allowStale,
}
stubs, _, err := d.client.Services().List(opts)
if err != nil {
d.metrics.failuresCount.Inc()
return nil, err
}
tg := &targetgroup.Group{
Source: "Nomad",
}
for _, stub := range stubs {
for _, service := range stub.Services {
instances, _, err := d.client.Services().Get(service.ServiceName, opts)
if err != nil {
d.metrics.failuresCount.Inc()
return nil, fmt.Errorf("failed to fetch services: %w", err)
}
for _, instance := range instances {
labels := model.LabelSet{
nomadAddress: model.LabelValue(instance.Address),
nomadDatacenter: model.LabelValue(instance.Datacenter),
nomadNodeID: model.LabelValue(instance.NodeID),
nomadNamespace: model.LabelValue(instance.Namespace),
nomadServiceAddress: model.LabelValue(instance.Address),
nomadServiceID: model.LabelValue(instance.ID),
nomadServicePort: model.LabelValue(strconv.Itoa(instance.Port)),
nomadService: model.LabelValue(instance.ServiceName),
}
addr := net.JoinHostPort(instance.Address, strconv.FormatInt(int64(instance.Port), 10))
labels[model.AddressLabel] = model.LabelValue(addr)
if len(instance.Tags) > 0 {
tags := d.tagSeparator + strings.Join(instance.Tags, d.tagSeparator) + d.tagSeparator
labels[nomadTags] = model.LabelValue(tags)
}
tg.Targets = append(tg.Targets, labels)
}
}
}
return []*targetgroup.Group{tg}, nil
}