diff --git a/discovery/manager.go b/discovery/manager.go index 5b5753f6f..669a91dc5 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -117,6 +117,13 @@ func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) e return nil } +// StartCustomProvider is used for sdtool. Only use this if you know what you're doing. +func (m *Manager) StartCustomProvider(ctx context.Context, name string, worker Discoverer) { + // Pool key for non-standard SD implementations are unknown. + poolKey := poolKey{setName: name, provider: name} + m.startProvider(ctx, poolKey, worker) +} + func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { ctx, cancel := context.WithCancel(ctx) updates := make(chan []*targetgroup.Group) diff --git a/documentation/examples/custom-sd/README.md b/documentation/examples/custom-sd/README.md new file mode 100644 index 000000000..785933b45 --- /dev/null +++ b/documentation/examples/custom-sd/README.md @@ -0,0 +1,18 @@ +# Custom SD +Custom SD is available via the file_sd adapter, which can be used to access SD mechanisms that are not +included in official Prometheus releases. The sd adapter outputs a file that can be passed via your `prometheus.yml` +as a `file_sd` file. This will allow you to pass your targets to Prometheus for scraping without having +to use a static config. + +# Example +This directory (`documentation/examples/custom-sd`) contains an example custom service discovery implementation +as well as a file_sd adapter usage. `adapter-usage` contains the `Discoverer` implementation for a basic Consul +service discovery mechanism. It simply queries Consul for all it's known services (except Consul itself), +and sends them along with all the other service data as labels as a TargetGroup. +The `adapter` directory contains the adapter code you will want to important and pass your `Discoverer` +implementation to. + +# Usage +To use file_sd adapter you must implement a `Discoverer`. In `adapter-usage/main.go` replace the example +SD config and create an instance of your SD implementation to pass to the `Adapter`'s `NewAdapter`. See the +`Note:` comments for the structs and `Run` function. \ No newline at end of file diff --git a/documentation/examples/custom-sd/adapter-usage/main.go b/documentation/examples/custom-sd/adapter-usage/main.go new file mode 100644 index 000000000..071eb22ec --- /dev/null +++ b/documentation/examples/custom-sd/adapter-usage/main.go @@ -0,0 +1,251 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/documentation/examples/custom-sd/adapter" + "github.com/prometheus/prometheus/util/strutil" + "gopkg.in/alecthomas/kingpin.v2" +) + +var ( + a = kingpin.New("sd adapter usage", "Tool to generate file_sd target files for unimplemented SD mechanisms.") + outputFile = a.Flag("output.file", "Output file for file_sd compatible file.").Default("custom_sd.json").String() + listenAddress = a.Flag("listen.address", "The address the HTTP sd is listening on for requests.").Default("localhost:8080").String() + logger log.Logger + + // addressLabel is the name for the label containing a target's address. + addressLabel = model.MetaLabelPrefix + "consul_address" + // nodeLabel is the name for the label containing a target's node name. + nodeLabel = model.MetaLabelPrefix + "consul_node" + // metaDataLabel is the prefix for the labels mapping to a target's metadata. + metaDataLabel = model.MetaLabelPrefix + "consul_metadata_" + // tagsLabel is the name of the label containing the tags assigned to the target. + tagsLabel = model.MetaLabelPrefix + "consul_tags" + // serviceLabel is the name of the label containing the service name. + serviceLabel = model.MetaLabelPrefix + "consul_service" + // serviceAddressLabel is the name of the label containing the (optional) service address. + serviceAddressLabel = model.MetaLabelPrefix + "consul_service_address" + //servicePortLabel is the name of the label containing the service port. + servicePortLabel = model.MetaLabelPrefix + "consul_service_port" + // datacenterLabel is the name of the label containing the datacenter ID. + datacenterLabel = model.MetaLabelPrefix + "consul_dc" + // serviceIDLabel is the name of the label containing the service ID. + serviceIDLabel = model.MetaLabelPrefix + "consul_service_id" +) + +// CatalogService is copied from https://github.com/hashicorp/consul/blob/master/api/catalog.go +// this struct respresents the response from a /service/ request. +// Consul License: https://github.com/hashicorp/consul/blob/master/LICENSE +type CatalogService struct { + ID string + Node string + Address string + Datacenter string + TaggedAddresses map[string]string + NodeMeta map[string]string + ServiceID string + ServiceName string + ServiceAddress string + ServiceTags []string + ServicePort int + ServiceEnableTagOverride bool + CreateIndex uint64 + ModifyIndex uint64 +} + +// Note: create a config struct for your custom SD type here. +type sdConfig struct { + Address string + TagSeparator string + RefreshInterval int +} + +// Note: This is the struct with your implementation of the Discoverer interface (see Run function). +// Discovery retrieves target information from a Consul server and updates them via watches. +type discovery struct { + address string + refreshInterval int + clientDatacenter string + tagSeparator string + logger log.Logger +} + +func (d *discovery) parseServiceNodes(resp *http.Response, name string) (*targetgroup.Group, error) { + var nodes []*CatalogService + tgroup := targetgroup.Group{ + Source: name, + Labels: make(model.LabelSet), + } + + dec := json.NewDecoder(resp.Body) + defer resp.Body.Close() + err := dec.Decode(&nodes) + + if err != nil { + return &tgroup, err + } + + tgroup.Targets = make([]model.LabelSet, 0, len(nodes)) + + for _, node := range nodes { + // We surround the separated list with the separator as well. This way regular expressions + // in relabeling rules don't have to consider tag positions. + var tags = "," + strings.Join(node.ServiceTags, ",") + "," + + // If the service address is not empty it should be used instead of the node address + // since the service may be registered remotely through a different node. + var addr string + if node.ServiceAddress != "" { + addr = net.JoinHostPort(node.ServiceAddress, fmt.Sprintf("%d", node.ServicePort)) + } else { + addr = net.JoinHostPort(node.Address, fmt.Sprintf("%d", node.ServicePort)) + } + + target := model.LabelSet{model.AddressLabel: model.LabelValue(addr)} + labels := model.LabelSet{ + model.AddressLabel: model.LabelValue(addr), + model.LabelName(addressLabel): model.LabelValue(node.Address), + model.LabelName(nodeLabel): model.LabelValue(node.Node), + model.LabelName(tagsLabel): model.LabelValue(tags), + model.LabelName(serviceAddressLabel): model.LabelValue(node.ServiceAddress), + model.LabelName(servicePortLabel): model.LabelValue(strconv.Itoa(node.ServicePort)), + model.LabelName(serviceIDLabel): model.LabelValue(node.ServiceID), + } + tgroup.Labels = labels + + // Add all key/value pairs from the node's metadata as their own labels. + for k, v := range node.NodeMeta { + name := strutil.SanitizeLabelName(k) + tgroup.Labels[model.LabelName(model.MetaLabelPrefix+name)] = model.LabelValue(v) + } + + tgroup.Targets = append(tgroup.Targets, target) + } + return &tgroup, nil +} + +// Note: you must implement this function for your discovery implementation as part of the +// Discoverer interface. Here you should query your SD for it's list of known targets, determine +// which of those targets you care about (for example, which of Consuls known services do you want +// to scrape for metrics), and then send those targets as a target.TargetGroup to the ch channel. +func (d *discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { + for c := time.Tick(time.Duration(d.refreshInterval) * time.Second); ; { + var srvs map[string][]string + resp, err := http.Get(fmt.Sprintf("http://%s/v1/catalog/services", d.address)) + + if err != nil { + level.Error(d.logger).Log("msg", "Error getting services list", "err", err) + time.Sleep(time.Duration(d.refreshInterval) * time.Second) + continue + } + + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&srvs) + resp.Body.Close() + if err != nil { + level.Error(d.logger).Log("msg", "Error reading services list", "err", err) + time.Sleep(time.Duration(d.refreshInterval) * time.Second) + continue + } + + var tgs []*targetgroup.Group + // Note that we treat errors when querying specific consul services as fatal for for this + // iteration of the time.Tick loop. It's better to have some stale targets than an incomplete + // list of targets simply because there may have been a timeout. If the service is actually + // gone as far as consul is concerned, that will be picked up during the next iteration of + // the outer loop. + for name := range srvs { + if name == "consul" { + continue + } + resp, err := http.Get(fmt.Sprintf("http://%s/v1/catalog/service/%s", d.address, name)) + if err != nil { + level.Error(d.logger).Log("msg", "Error getting services nodes", "service", name, "err", err) + break + } + tg, err := d.parseServiceNodes(resp, name) + if err != nil { + level.Error(d.logger).Log("msg", "Error parsing services nodes", "service", name, "err", err) + break + } + tgs = append(tgs, tg) + } + if err != nil { + // We're returning all Consul services as a single targetgroup. + ch <- tgs + } + // Wait for ticker or exit when ctx is closed. + select { + case <-c: + continue + case <-ctx.Done(): + return + } + } +} + +func newDiscovery(conf sdConfig) (*discovery, error) { + cd := &discovery{ + address: conf.Address, + refreshInterval: conf.RefreshInterval, + tagSeparator: conf.TagSeparator, + logger: logger, + } + return cd, nil +} + +func main() { + a.HelpFlag.Short('h') + + _, err := a.Parse(os.Args[1:]) + if err != nil { + fmt.Println("err: ", err) + return + } + logger = log.NewSyncLogger(log.NewLogfmtLogger(os.Stdout)) + logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + + ctx := context.Background() + + // NOTE: create an instance of your new SD implementation here. + cfg := sdConfig{ + TagSeparator: ",", + Address: "localhost:8500", + RefreshInterval: 30, + } + + disc, err := newDiscovery(cfg) + if err != nil { + fmt.Println("err: ", err) + } + sdAdapter := adapter.NewAdapter(ctx, *outputFile, "exampleSD", disc, logger) + sdAdapter.Run() + + <-ctx.Done() +} diff --git a/documentation/examples/custom-sd/adapter/adapter.go b/documentation/examples/custom-sd/adapter/adapter.go new file mode 100644 index 000000000..6452a50f9 --- /dev/null +++ b/documentation/examples/custom-sd/adapter/adapter.go @@ -0,0 +1,151 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapter + +// NOTE: you do not need to edit this file when implementing a custom sd. +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +type customSD struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + +// Adapter runs an unknown service discovery implementation and converts its target groups +// to JSON and writes to a file for file_sd. +type Adapter struct { + ctx context.Context + disc discovery.Discoverer + groups map[string]*customSD + manager *discovery.Manager + output string + name string + logger log.Logger +} + +func mapToArray(m map[string]*customSD) []customSD { + arr := make([]customSD, 0, len(m)) + for _, v := range m { + arr = append(arr, *v) + } + return arr +} + +// Parses incoming target groups updates. If the update contains changes to the target groups +// Adapter already knows about, or new target groups, we Marshal to JSON and write to file. +func (a *Adapter) generateTargetGroups(allTargetGroups map[string][]*targetgroup.Group) { + tempGroups := make(map[string]*customSD) + for k, sdTargetGroups := range allTargetGroups { + for i, group := range sdTargetGroups { + newTargets := make([]string, 0) + newLabels := make(map[string]string) + + for _, targets := range group.Targets { + for _, target := range targets { + newTargets = append(newTargets, string(target)) + } + } + + for name, value := range group.Labels { + newLabels[string(name)] = string(value) + } + // Make a unique key, including the current index, in case the sd_type (map key) and group.Source is not unique. + key := fmt.Sprintf("%s:%s:%d", k, group.Source, i) + tempGroups[key] = &customSD{ + Targets: newTargets, + Labels: newLabels, + } + } + } + if !reflect.DeepEqual(a.groups, tempGroups) { + a.groups = tempGroups + err := a.writeOutput() + if err != nil { + level.Error(log.With(a.logger, "component", "sd-adapter")).Log("err", err) + } + } + +} + +// Writes JSON formatted targets to output file. +func (a *Adapter) writeOutput() error { + arr := mapToArray(a.groups) + b, _ := json.MarshalIndent(arr, "", " ") + + dir, _ := filepath.Split(a.output) + tmpfile, err := ioutil.TempFile(dir, "sd-adapter") + if err != nil { + return err + } + defer tmpfile.Close() + + _, err = tmpfile.Write(b) + if err != nil { + return err + } + + err = os.Rename(tmpfile.Name(), a.output) + if err != nil { + return err + } + return nil +} + +func (a *Adapter) runCustomSD(ctx context.Context) { + updates := a.manager.SyncCh() + for { + select { + case <-ctx.Done(): + case allTargetGroups, ok := <-updates: + // Handle the case that a target provider exits and closes the channel + // before the context is done. + if !ok { + return + } + a.generateTargetGroups(allTargetGroups) + } + } +} + +// Run starts a Discovery Manager and the custom service discovery implementation. +func (a *Adapter) Run() { + go a.manager.Run() + a.manager.StartCustomProvider(a.ctx, a.name, a.disc) + go a.runCustomSD(a.ctx) +} + +// NewAdapter creates a new instance of Adapter. +func NewAdapter(ctx context.Context, file string, name string, d discovery.Discoverer, logger log.Logger) *Adapter { + return &Adapter{ + ctx: ctx, + disc: d, + groups: make(map[string]*customSD), + manager: discovery.NewManager(ctx, logger), + output: file, + name: name, + logger: logger, + } +}