Add initial file based service discovery.

This commits adds file based service discovery which reads target
groups from specified files. It detects changes based on file watches
and regular refreshes.
This commit is contained in:
Fabian Reinartz 2015-05-13 12:03:31 +02:00
parent 36016cb308
commit 93548a8882
10 changed files with 404 additions and 23 deletions

View file

@ -1,6 +1,7 @@
package config
import (
"encoding/json"
"fmt"
"io/ioutil"
"regexp"
@ -16,7 +17,7 @@ import (
var (
patJobName = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_-]*$`)
patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml)$`)
patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`)
)
// Load parses the YAML input s into a Config.
@ -262,6 +263,28 @@ func (tg TargetGroup) MarshalYAML() (interface{}, error) {
return g, nil
}
// UnmarshalJSON implements the json.Unmarshaller interface.
func (tg *TargetGroup) UnmarshalJSON(b []byte) error {
g := struct {
Targets []string `yaml:"targets"`
Labels clientmodel.LabelSet `yaml:"labels"`
}{}
if err := json.Unmarshal(b, &g); err != nil {
return err
}
tg.Targets = make([]clientmodel.LabelSet, 0, len(g.Targets))
for _, t := range g.Targets {
if strings.Contains(t, "/") {
return fmt.Errorf("%q is not a valid hostname", t)
}
tg.Targets = append(tg.Targets, clientmodel.LabelSet{
clientmodel.AddressLabel: clientmodel.LabelValue(t),
})
}
tg.Labels = g.Labels
return nil
}
// DNSSDConfig is the configuration for DNS based service discovery.
type DNSSDConfig struct {
// DefaultedDNSSDConfig contains the actual fields for DNSSDConfig.
@ -301,7 +324,7 @@ func (c *FileSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err
}
if len(c.Names) == 0 {
return fmt.Errorf("file discovery config must contain at least on path name")
return fmt.Errorf("file service discovery config must contain at least one path name")
}
for _, name := range c.Names {
if !patFileSDName.MatchString(name) {

View file

@ -54,7 +54,7 @@ var expectedConf = &Config{DefaultedConfig{
FileSDConfigs: []*FileSDConfig{
{DefaultedFileSDConfig{
Names: []string{"foo/*.slow.json", "foo/*.slow.yml"},
Names: []string{"foo/*.slow.json", "foo/*.slow.yml", "single/file.yml"},
RefreshInterval: Duration(10 * time.Minute),
}},
{DefaultedFileSDConfig{

View file

@ -28,6 +28,7 @@ scrape_configs:
- names:
- foo/*.slow.json
- foo/*.slow.yml
- single/file.yml
refresh_interval: 10m
- names:
- bar/*.yaml

249
retrieval/discovery/file.go Normal file
View file

@ -0,0 +1,249 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"time"
"github.com/golang/glog"
"gopkg.in/fsnotify.v1"
"gopkg.in/yaml.v2"
"github.com/prometheus/prometheus/config"
)
// FileDiscovery provides service discovery functionality based
// on files that contain target groups in JSON or YAML format. Refreshing
// happens using file watches and periodic refreshes.
type FileDiscovery struct {
paths []string
watcher *fsnotify.Watcher
interval time.Duration
done chan struct{}
// lastRefresh stores which files were found during the last refresh
// and how many target groups they contained.
// This is used to detect deleted target groups.
lastRefresh map[string]int
}
// NewFileDiscovery returns a new file discovery for the given paths.
func NewFileDiscovery(paths []string, interval time.Duration) *FileDiscovery {
fd := &FileDiscovery{
paths: paths,
interval: interval,
done: make(chan struct{}),
}
return fd
}
// Sources implements the TargetProvider interface.
func (fd *FileDiscovery) Sources() []string {
var srcs []string
// As we allow multiple target groups per file we have no choice
// but to parse them all.
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
if err != nil {
glog.Errorf("Error reading file %q: ", p, err)
}
for _, tg := range tgroups {
srcs = append(srcs, tg.Source)
}
}
return srcs
}
// listFiles returns a list of all files that match the configured patterns.
func (fd *FileDiscovery) listFiles() []string {
var paths []string
for _, p := range fd.paths {
files, err := filepath.Glob(p)
if err != nil {
glog.Errorf("Error expanding glob %q: %s", p, err)
continue
}
paths = append(paths, files...)
}
return paths
}
// watchFiles sets watches on all full paths or directories that were configured for
// this file discovery.
func (fd *FileDiscovery) watchFiles() {
if fd.watcher == nil {
panic("no watcher configured")
}
for _, p := range fd.paths {
if idx := strings.LastIndex(p, "/"); idx > -1 {
p = p[:idx]
} else {
p = "./"
}
if err := fd.watcher.Add(p); err != nil {
glog.Errorf("Error adding file watch for %q: %s", p, err)
}
}
}
// Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) {
defer close(ch)
watcher, err := fsnotify.NewWatcher()
if err != nil {
glog.Errorf("Error creating file watcher: %s", err)
return
}
fd.watcher = watcher
fd.refresh(ch)
ticker := time.NewTicker(fd.interval)
defer ticker.Stop()
for {
// Stopping has priority over refreshing. Thus we wrap the actual select
// clause to always catch done signals.
select {
case <-fd.done:
return
default:
select {
case event := <-fd.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if len(event.Name) == 0 {
break
}
// Everything but a chmod requires rereading.
if event.Op^fsnotify.Chmod == 0 {
break
}
// Changes to a file can spawn various sequences of events with
// different combinations of operations. For all practical purposes
// this is inaccurate.
// The most reliable solution is to reload everything if anything happens.
fd.refresh(ch)
case <-ticker.C:
// Setting a new watch after an update might fail. Make sure we don't lose
// those files forever.
fd.refresh(ch)
case err := <-fd.watcher.Errors:
if err != nil {
glog.Errorf("Error on file watch: %s", err)
}
case <-fd.done:
return
}
}
}
}
// refresh reads all files matching the discoveries patterns and sends the respective
// updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
ref := map[string]int{}
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
if err != nil {
glog.Errorf("Error reading file %q: %s", p, err)
// Prevent deletion down below.
ref[p] = fd.lastRefresh[p]
continue
}
for _, tg := range tgroups {
ch <- tg
}
ref[p] = len(tgroups)
}
// Send empty updates for sources that disappeared.
for f, n := range fd.lastRefresh {
m, ok := ref[f]
if !ok || n > m {
for i := m; i < n; i++ {
ch <- &config.TargetGroup{Source: fileSource(f, i)}
}
}
}
fd.lastRefresh = ref
fd.watchFiles()
}
// fileSource returns a source ID for the i-th target group in the file.
func fileSource(filename string, i int) string {
return fmt.Sprintf("file:%s:%d", filename, i)
}
// Stop implements the TargetProvider interface.
func (fd *FileDiscovery) Stop() {
glog.V(1).Info("Stopping file discovery for %s...", fd.paths)
fd.done <- struct{}{}
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-fd.watcher.Errors:
case <-fd.watcher.Events:
// Drain all events and errors.
case <-fd.done:
return
}
}
}()
fd.watcher.Close()
fd.done <- struct{}{}
glog.V(1).Info("File discovery for %s stopped.", fd.paths)
}
// readFile reads a JSON or YAML list of targets groups from the file, depending on its
// file extension. It returns full configuration target groups.
func readFile(filename string) ([]*config.TargetGroup, error) {
content, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
var targetGroups []*config.TargetGroup
switch ext := filepath.Ext(filename); strings.ToLower(ext) {
case ".json":
if err := json.Unmarshal(content, &targetGroups); err != nil {
return nil, err
}
case ".yml", ".yaml":
if err := yaml.Unmarshal(content, &targetGroups); err != nil {
return nil, err
}
default:
panic(fmt.Errorf("retrieval.FileDiscovery.readFile: unhandled file extension %q", ext))
}
for i, tg := range targetGroups {
tg.Source = fileSource(filename, i)
}
return targetGroups, nil
}

View file

@ -0,0 +1,93 @@
package discovery
import (
"fmt"
"io"
"os"
"testing"
"time"
"github.com/prometheus/prometheus/config"
)
func TestFileSD(t *testing.T) {
testFileSD(t, ".yml")
testFileSD(t, ".json")
os.Remove("fixtures/_test.yml")
os.Remove("fixtures/_test.json")
}
func testFileSD(t *testing.T, ext string) {
// As interval refreshing is more of a fallback, we only want to test
// whether file watches work as expected.
fsd := NewFileDiscovery([]string{"fixtures/_*" + ext}, 1*time.Hour)
ch := make(chan *config.TargetGroup)
go fsd.Run(ch)
defer fsd.Stop()
select {
case <-time.After(25 * time.Millisecond):
// Expected.
case tg := <-ch:
t.Fatalf("Unexpected target group in file discovery: %s", tg)
}
newf, err := os.Create("fixtures/_test" + ext)
if err != nil {
t.Fatal(err)
}
defer newf.Close()
f, err := os.Open("fixtures/target_groups" + ext)
if err != nil {
t.Fatal(err)
}
defer f.Close()
_, err = io.Copy(newf, f)
if err != nil {
t.Fatal(err)
}
newf.Close()
// The files contain two target groups which are read and sent in order.
select {
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("file:fixtures/_test%s:0", ext) {
t.Fatalf("Unexpected target group", tg)
}
}
select {
case <-time.After(15 * time.Second):
t.Fatalf("Expected new target group but got none")
case tg := <-ch:
if tg.String() != fmt.Sprintf("file:fixtures/_test%s:1", ext) {
t.Fatalf("Unexpected target group %s", tg)
}
}
// Based on unknown circumstances, sometimes fsnotify will trigger more events in
// some runs (which might be empty, chains of different operations etc.).
// We have to drain those (as the target manager would) to avoid deadlocking and must
// not try to make sense of it all...
go func() {
for tg := range ch {
// Below we will change the file to a bad syntax. Previously extracted target
// groups must not be deleted via sending an empty target group.
if len(tg.Targets) == 0 {
t.Fatalf("Unexpected empty target group received: %s", tg)
}
}
}()
newf, err = os.Create("fixtures/_test" + ext)
if err != nil {
t.Fatal(err)
}
if _, err := newf.Write([]byte("]gibberish\n][")); err != nil {
t.Fatal(err)
}
newf.Close()
}

View file

@ -0,0 +1,11 @@
[
{
"targets": ["localhost:9090", "example.org:443"],
"labels": {
"foo": "bar"
}
},
{
"targets": ["my.domain"]
}
]

View file

@ -0,0 +1,5 @@
- targets: ['localhost:9090', 'example.org:443']
labels:
test: success
- targets: ['my.domain']

View file

@ -213,8 +213,6 @@ func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSe
}
func (t *target) String() string {
t.RLock()
defer t.RUnlock()
return t.url.Host
}
@ -402,22 +400,6 @@ func (t *target) URL() string {
// InstanceIdentifier implements Target.
func (t *target) InstanceIdentifier() string {
// If we are given a port in the host port, use that.
if strings.Contains(t.url.Host, ":") {
return t.url.Host
}
t.RLock()
defer t.RUnlock()
// Otherwise, deduce port based on protocol.
if t.url.Scheme == "http" {
return fmt.Sprintf("%s:80", t.url.Host)
} else if t.url.Scheme == "https" {
return fmt.Sprintf("%s:443", t.url.Host)
}
glog.Warningf("Unknown scheme %s when generating identifier, using host without port number.", t.url.Scheme)
return t.url.Host
}

View file

@ -34,7 +34,7 @@ func TestTargetInterface(t *testing.T) {
}
func TestBaseLabels(t *testing.T) {
target := newTestTarget("example.com", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"})
target := newTestTarget("example.com:80", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"})
want := clientmodel.LabelSet{
clientmodel.JobLabel: "some_job",
clientmodel.InstanceLabel: "example.com:80",
@ -89,7 +89,7 @@ func TestTargetScrapeWithFullChannel(t *testing.T) {
}
func TestTargetRecordScrapeHealth(t *testing.T) {
testTarget := newTestTarget("example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"})
testTarget := newTestTarget("example.url:80", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"})
now := clientmodel.Now()
appender := &collectResultAppender{}

View file

@ -294,6 +294,19 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc
targets := make([]Target, 0, len(tg.Targets))
for i, labels := range tg.Targets {
addr := string(labels[clientmodel.AddressLabel])
// If no port was provided, infer it based on the used scheme.
if !strings.Contains(addr, ":") {
switch cfg.Scheme {
case "http":
addr = fmt.Sprintf("%s:80", addr)
case "https":
addr = fmt.Sprintf("%s:443", addr)
default:
panic(fmt.Errorf("targetsFromGroup: invalid scheme %q", cfg.Scheme))
}
labels[clientmodel.AddressLabel] = clientmodel.LabelValue(addr)
}
// Copy labels into the labelset for the target if they are not
// set already. Apply the labelsets in order of decreasing precedence.
labelsets := []clientmodel.LabelSet{
@ -347,6 +360,10 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider {
dnsSD := discovery.NewDNSDiscovery(dnscfg.Names, time.Duration(dnscfg.RefreshInterval))
providers = append(providers, dnsSD)
}
for _, filecfg := range cfg.FileSDConfigs {
fileSD := discovery.NewFileDiscovery(filecfg.Names, time.Duration(filecfg.RefreshInterval))
providers = append(providers, fileSD)
}
if len(cfg.TargetGroups) > 0 {
providers = append(providers, NewStaticProvider(cfg.TargetGroups))
}