diff --git a/config/config.go b/config/config.go index 81ab4fb06f..5da03e2de4 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "encoding/json" "fmt" "io/ioutil" "regexp" @@ -16,7 +17,7 @@ import ( var ( patJobName = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_-]*$`) - patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml)$`) + patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`) ) // Load parses the YAML input s into a Config. @@ -262,6 +263,28 @@ func (tg TargetGroup) MarshalYAML() (interface{}, error) { return g, nil } +// UnmarshalJSON implements the json.Unmarshaller interface. +func (tg *TargetGroup) UnmarshalJSON(b []byte) error { + g := struct { + Targets []string `yaml:"targets"` + Labels clientmodel.LabelSet `yaml:"labels"` + }{} + if err := json.Unmarshal(b, &g); err != nil { + return err + } + tg.Targets = make([]clientmodel.LabelSet, 0, len(g.Targets)) + for _, t := range g.Targets { + if strings.Contains(t, "/") { + return fmt.Errorf("%q is not a valid hostname", t) + } + tg.Targets = append(tg.Targets, clientmodel.LabelSet{ + clientmodel.AddressLabel: clientmodel.LabelValue(t), + }) + } + tg.Labels = g.Labels + return nil +} + // DNSSDConfig is the configuration for DNS based service discovery. type DNSSDConfig struct { // DefaultedDNSSDConfig contains the actual fields for DNSSDConfig. @@ -301,7 +324,7 @@ func (c *FileSDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } if len(c.Names) == 0 { - return fmt.Errorf("file discovery config must contain at least on path name") + return fmt.Errorf("file service discovery config must contain at least one path name") } for _, name := range c.Names { if !patFileSDName.MatchString(name) { diff --git a/config/config_test.go b/config/config_test.go index 40ea0fa7e2..ad21d3e0ca 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -54,7 +54,7 @@ var expectedConf = &Config{DefaultedConfig{ FileSDConfigs: []*FileSDConfig{ {DefaultedFileSDConfig{ - Names: []string{"foo/*.slow.json", "foo/*.slow.yml"}, + Names: []string{"foo/*.slow.json", "foo/*.slow.yml", "single/file.yml"}, RefreshInterval: Duration(10 * time.Minute), }}, {DefaultedFileSDConfig{ diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index cde9eeed68..6af1db01c5 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -28,6 +28,7 @@ scrape_configs: - names: - foo/*.slow.json - foo/*.slow.yml + - single/file.yml refresh_interval: 10m - names: - bar/*.yaml diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go new file mode 100644 index 0000000000..cda4cbc1a6 --- /dev/null +++ b/retrieval/discovery/file.go @@ -0,0 +1,249 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "path/filepath" + "strings" + "time" + + "github.com/golang/glog" + "gopkg.in/fsnotify.v1" + "gopkg.in/yaml.v2" + + "github.com/prometheus/prometheus/config" +) + +// FileDiscovery provides service discovery functionality based +// on files that contain target groups in JSON or YAML format. Refreshing +// happens using file watches and periodic refreshes. +type FileDiscovery struct { + paths []string + watcher *fsnotify.Watcher + interval time.Duration + done chan struct{} + + // lastRefresh stores which files were found during the last refresh + // and how many target groups they contained. + // This is used to detect deleted target groups. + lastRefresh map[string]int +} + +// NewFileDiscovery returns a new file discovery for the given paths. +func NewFileDiscovery(paths []string, interval time.Duration) *FileDiscovery { + fd := &FileDiscovery{ + paths: paths, + interval: interval, + done: make(chan struct{}), + } + return fd +} + +// Sources implements the TargetProvider interface. +func (fd *FileDiscovery) Sources() []string { + var srcs []string + // As we allow multiple target groups per file we have no choice + // but to parse them all. + for _, p := range fd.listFiles() { + tgroups, err := readFile(p) + if err != nil { + glog.Errorf("Error reading file %q: ", p, err) + } + for _, tg := range tgroups { + srcs = append(srcs, tg.Source) + } + } + return srcs +} + +// listFiles returns a list of all files that match the configured patterns. +func (fd *FileDiscovery) listFiles() []string { + var paths []string + for _, p := range fd.paths { + files, err := filepath.Glob(p) + if err != nil { + glog.Errorf("Error expanding glob %q: %s", p, err) + continue + } + paths = append(paths, files...) + } + return paths +} + +// watchFiles sets watches on all full paths or directories that were configured for +// this file discovery. +func (fd *FileDiscovery) watchFiles() { + if fd.watcher == nil { + panic("no watcher configured") + } + for _, p := range fd.paths { + if idx := strings.LastIndex(p, "/"); idx > -1 { + p = p[:idx] + } else { + p = "./" + } + if err := fd.watcher.Add(p); err != nil { + glog.Errorf("Error adding file watch for %q: %s", p, err) + } + } +} + +// Run implements the TargetProvider interface. +func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup) { + defer close(ch) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + glog.Errorf("Error creating file watcher: %s", err) + return + } + fd.watcher = watcher + + fd.refresh(ch) + + ticker := time.NewTicker(fd.interval) + defer ticker.Stop() + + for { + // Stopping has priority over refreshing. Thus we wrap the actual select + // clause to always catch done signals. + select { + case <-fd.done: + return + default: + select { + case event := <-fd.watcher.Events: + // fsnotify sometimes sends a bunch of events without name or operation. + // It's unclear what they are and why they are sent - filter them out. + if len(event.Name) == 0 { + break + } + // Everything but a chmod requires rereading. + if event.Op^fsnotify.Chmod == 0 { + break + } + // Changes to a file can spawn various sequences of events with + // different combinations of operations. For all practical purposes + // this is inaccurate. + // The most reliable solution is to reload everything if anything happens. + fd.refresh(ch) + + case <-ticker.C: + // Setting a new watch after an update might fail. Make sure we don't lose + // those files forever. + fd.refresh(ch) + + case err := <-fd.watcher.Errors: + if err != nil { + glog.Errorf("Error on file watch: %s", err) + } + + case <-fd.done: + return + } + } + } +} + +// refresh reads all files matching the discoveries patterns and sends the respective +// updated target groups through the channel. +func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) { + ref := map[string]int{} + for _, p := range fd.listFiles() { + tgroups, err := readFile(p) + if err != nil { + glog.Errorf("Error reading file %q: %s", p, err) + // Prevent deletion down below. + ref[p] = fd.lastRefresh[p] + continue + } + for _, tg := range tgroups { + ch <- tg + } + ref[p] = len(tgroups) + } + // Send empty updates for sources that disappeared. + for f, n := range fd.lastRefresh { + m, ok := ref[f] + if !ok || n > m { + for i := m; i < n; i++ { + ch <- &config.TargetGroup{Source: fileSource(f, i)} + } + } + } + fd.lastRefresh = ref + + fd.watchFiles() +} + +// fileSource returns a source ID for the i-th target group in the file. +func fileSource(filename string, i int) string { + return fmt.Sprintf("file:%s:%d", filename, i) +} + +// Stop implements the TargetProvider interface. +func (fd *FileDiscovery) Stop() { + glog.V(1).Info("Stopping file discovery for %s...", fd.paths) + + fd.done <- struct{}{} + // Closing the watcher will deadlock unless all events and errors are drained. + go func() { + for { + select { + case <-fd.watcher.Errors: + case <-fd.watcher.Events: + // Drain all events and errors. + case <-fd.done: + return + } + } + }() + fd.watcher.Close() + + fd.done <- struct{}{} + + glog.V(1).Info("File discovery for %s stopped.", fd.paths) +} + +// readFile reads a JSON or YAML list of targets groups from the file, depending on its +// file extension. It returns full configuration target groups. +func readFile(filename string) ([]*config.TargetGroup, error) { + content, err := ioutil.ReadFile(filename) + if err != nil { + return nil, err + } + + var targetGroups []*config.TargetGroup + + switch ext := filepath.Ext(filename); strings.ToLower(ext) { + case ".json": + if err := json.Unmarshal(content, &targetGroups); err != nil { + return nil, err + } + case ".yml", ".yaml": + if err := yaml.Unmarshal(content, &targetGroups); err != nil { + return nil, err + } + default: + panic(fmt.Errorf("retrieval.FileDiscovery.readFile: unhandled file extension %q", ext)) + } + + for i, tg := range targetGroups { + tg.Source = fileSource(filename, i) + } + return targetGroups, nil +} diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go new file mode 100644 index 0000000000..ca020dc983 --- /dev/null +++ b/retrieval/discovery/file_test.go @@ -0,0 +1,93 @@ +package discovery + +import ( + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/prometheus/prometheus/config" +) + +func TestFileSD(t *testing.T) { + testFileSD(t, ".yml") + testFileSD(t, ".json") + os.Remove("fixtures/_test.yml") + os.Remove("fixtures/_test.json") +} + +func testFileSD(t *testing.T, ext string) { + // As interval refreshing is more of a fallback, we only want to test + // whether file watches work as expected. + fsd := NewFileDiscovery([]string{"fixtures/_*" + ext}, 1*time.Hour) + + ch := make(chan *config.TargetGroup) + go fsd.Run(ch) + defer fsd.Stop() + + select { + case <-time.After(25 * time.Millisecond): + // Expected. + case tg := <-ch: + t.Fatalf("Unexpected target group in file discovery: %s", tg) + } + + newf, err := os.Create("fixtures/_test" + ext) + if err != nil { + t.Fatal(err) + } + defer newf.Close() + + f, err := os.Open("fixtures/target_groups" + ext) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + _, err = io.Copy(newf, f) + if err != nil { + t.Fatal(err) + } + newf.Close() + + // The files contain two target groups which are read and sent in order. + select { + case <-time.After(15 * time.Second): + t.Fatalf("Expected new target group but got none") + case tg := <-ch: + if tg.String() != fmt.Sprintf("file:fixtures/_test%s:0", ext) { + t.Fatalf("Unexpected target group", tg) + } + } + select { + case <-time.After(15 * time.Second): + t.Fatalf("Expected new target group but got none") + case tg := <-ch: + if tg.String() != fmt.Sprintf("file:fixtures/_test%s:1", ext) { + t.Fatalf("Unexpected target group %s", tg) + } + } + // Based on unknown circumstances, sometimes fsnotify will trigger more events in + // some runs (which might be empty, chains of different operations etc.). + // We have to drain those (as the target manager would) to avoid deadlocking and must + // not try to make sense of it all... + go func() { + for tg := range ch { + // Below we will change the file to a bad syntax. Previously extracted target + // groups must not be deleted via sending an empty target group. + if len(tg.Targets) == 0 { + t.Fatalf("Unexpected empty target group received: %s", tg) + } + } + }() + + newf, err = os.Create("fixtures/_test" + ext) + if err != nil { + t.Fatal(err) + } + if _, err := newf.Write([]byte("]gibberish\n][")); err != nil { + t.Fatal(err) + } + newf.Close() +} diff --git a/retrieval/discovery/fixtures/target_groups.json b/retrieval/discovery/fixtures/target_groups.json new file mode 100644 index 0000000000..df4f0df199 --- /dev/null +++ b/retrieval/discovery/fixtures/target_groups.json @@ -0,0 +1,11 @@ +[ + { + "targets": ["localhost:9090", "example.org:443"], + "labels": { + "foo": "bar" + } + }, + { + "targets": ["my.domain"] + } +] diff --git a/retrieval/discovery/fixtures/target_groups.yml b/retrieval/discovery/fixtures/target_groups.yml new file mode 100644 index 0000000000..a4823b2b98 --- /dev/null +++ b/retrieval/discovery/fixtures/target_groups.yml @@ -0,0 +1,5 @@ +- targets: ['localhost:9090', 'example.org:443'] + labels: + test: success + +- targets: ['my.domain'] diff --git a/retrieval/target.go b/retrieval/target.go index 2f40ec132f..f0342bae2c 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -213,8 +213,6 @@ func (t *target) Update(cfg *config.ScrapeConfig, baseLabels clientmodel.LabelSe } func (t *target) String() string { - t.RLock() - defer t.RUnlock() return t.url.Host } @@ -402,22 +400,6 @@ func (t *target) URL() string { // InstanceIdentifier implements Target. func (t *target) InstanceIdentifier() string { - // If we are given a port in the host port, use that. - if strings.Contains(t.url.Host, ":") { - return t.url.Host - } - - t.RLock() - defer t.RUnlock() - - // Otherwise, deduce port based on protocol. - if t.url.Scheme == "http" { - return fmt.Sprintf("%s:80", t.url.Host) - } else if t.url.Scheme == "https" { - return fmt.Sprintf("%s:443", t.url.Host) - } - - glog.Warningf("Unknown scheme %s when generating identifier, using host without port number.", t.url.Scheme) return t.url.Host } diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 062d275687..857bd7dd81 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -34,7 +34,7 @@ func TestTargetInterface(t *testing.T) { } func TestBaseLabels(t *testing.T) { - target := newTestTarget("example.com", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"}) + target := newTestTarget("example.com:80", 0, clientmodel.LabelSet{"job": "some_job", "foo": "bar"}) want := clientmodel.LabelSet{ clientmodel.JobLabel: "some_job", clientmodel.InstanceLabel: "example.com:80", @@ -89,7 +89,7 @@ func TestTargetScrapeWithFullChannel(t *testing.T) { } func TestTargetRecordScrapeHealth(t *testing.T) { - testTarget := newTestTarget("example.url", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}) + testTarget := newTestTarget("example.url:80", 0, clientmodel.LabelSet{clientmodel.JobLabel: "testjob"}) now := clientmodel.Now() appender := &collectResultAppender{} diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 43d9d165a3..8c89e10896 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -294,6 +294,19 @@ func (tm *TargetManager) targetsFromGroup(tg *config.TargetGroup, cfg *config.Sc targets := make([]Target, 0, len(tg.Targets)) for i, labels := range tg.Targets { + addr := string(labels[clientmodel.AddressLabel]) + // If no port was provided, infer it based on the used scheme. + if !strings.Contains(addr, ":") { + switch cfg.Scheme { + case "http": + addr = fmt.Sprintf("%s:80", addr) + case "https": + addr = fmt.Sprintf("%s:443", addr) + default: + panic(fmt.Errorf("targetsFromGroup: invalid scheme %q", cfg.Scheme)) + } + labels[clientmodel.AddressLabel] = clientmodel.LabelValue(addr) + } // Copy labels into the labelset for the target if they are not // set already. Apply the labelsets in order of decreasing precedence. labelsets := []clientmodel.LabelSet{ @@ -347,6 +360,10 @@ func ProvidersFromConfig(cfg *config.ScrapeConfig) []TargetProvider { dnsSD := discovery.NewDNSDiscovery(dnscfg.Names, time.Duration(dnscfg.RefreshInterval)) providers = append(providers, dnsSD) } + for _, filecfg := range cfg.FileSDConfigs { + fileSD := discovery.NewFileDiscovery(filecfg.Names, time.Duration(filecfg.RefreshInterval)) + providers = append(providers, fileSD) + } if len(cfg.TargetGroups) > 0 { providers = append(providers, NewStaticProvider(cfg.TargetGroups)) }