// Copyright 2015 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License.package discovery package discovery import ( "fmt" "net/http" "strconv" "strings" "sync" "time" "github.com/prometheus/log" consul "github.com/hashicorp/consul/api" clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/config" ) const ( consulWatchTimeout = 30 * time.Second consulRetryInterval = 15 * time.Second // ConsuleAddressLabel is the name for the label containing a target's address. ConsulAddressLabel = clientmodel.MetaLabelPrefix + "consul_address" // ConsuleNodeLabel is the name for the label containing a target's node name. ConsulNodeLabel = clientmodel.MetaLabelPrefix + "consul_node" // ConsulTagsLabel is the name of the label containing the tags assigned to the target. ConsulTagsLabel = clientmodel.MetaLabelPrefix + "consul_tags" // ConsulServiceLabel is the name of the label containing the service name. ConsulServiceLabel = clientmodel.MetaLabelPrefix + "consul_service" // ConsulServiceAddressLabel is the name of the label containing the (optional) service address. ConsulServiceAddressLabel = clientmodel.MetaLabelPrefix + "consul_service_address" // ConsulServicePortLabel is the name of the label containing the service port. ConsulServicePortLabel = clientmodel.MetaLabelPrefix + "consul_service_port" // ConsulDCLabel is the name of the label containing the datacenter ID. ConsulDCLabel = clientmodel.MetaLabelPrefix + "consul_dc" ) // ConsulDiscovery retrieves target information from a Consul server // and updates them via watches. type ConsulDiscovery struct { client *consul.Client clientConf *consul.Config tagSeparator string scrapedServices map[string]struct{} mu sync.RWMutex services map[string]*consulService runDone, srvsDone chan struct{} } // consulService contains data belonging to the same service. type consulService struct { name string tgroup *config.TargetGroup lastIndex uint64 removed bool running bool done chan struct{} } // NewConsulDiscovery returns a new ConsulDiscovery for the given config. func NewConsulDiscovery(conf *config.ConsulSDConfig) *ConsulDiscovery { clientConf := &consul.Config{ Address: conf.Server, Scheme: conf.Scheme, Datacenter: conf.Datacenter, Token: conf.Token, HttpAuth: &consul.HttpBasicAuth{ Username: conf.Username, Password: conf.Password, }, } client, err := consul.NewClient(clientConf) if err != nil { // NewClient always returns a nil error. panic(fmt.Errorf("discovery.NewConsulDiscovery: %s", err)) } cd := &ConsulDiscovery{ client: client, clientConf: clientConf, tagSeparator: conf.TagSeparator, runDone: make(chan struct{}), srvsDone: make(chan struct{}, 1), scrapedServices: map[string]struct{}{}, services: map[string]*consulService{}, } for _, name := range conf.Services { cd.scrapedServices[name] = struct{}{} } return cd } // Sources implements the TargetProvider interface. func (cd *ConsulDiscovery) Sources() []string { clientConf := *cd.clientConf clientConf.HttpClient = &http.Client{Timeout: 5 * time.Second} client, err := consul.NewClient(&clientConf) if err != nil { // NewClient always returns a nil error. panic(fmt.Errorf("discovery.ConsulDiscovery.Sources: %s", err)) } srvs, _, err := client.Catalog().Services(nil) if err != nil { log.Errorf("Error refreshing service list: %s", err) return nil } cd.mu.Lock() defer cd.mu.Unlock() srcs := make([]string, 0, len(srvs)) for name := range srvs { if _, ok := cd.scrapedServices[name]; ok { srcs = append(srcs, name) } } return srcs } // Run implements the TargetProvider interface. func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup) { defer close(ch) update := make(chan *consulService, 10) go cd.watchServices(update) for { select { case <-cd.runDone: return case srv := <-update: if srv.removed { ch <- &config.TargetGroup{Source: srv.name} break } // Launch watcher for the service. if !srv.running { go cd.watchService(srv, ch) srv.running = true } } } } // Stop implements the TargetProvider interface. func (cd *ConsulDiscovery) Stop() { log.Debugf("Stopping Consul service discovery for %s", cd.clientConf.Address) // The lock prevents Run from terminating while the watchers attempt // to send on their channels. cd.mu.Lock() defer cd.mu.Unlock() // The watching goroutines will terminate after their next watch timeout. // As this can take long, the channel is buffered and we do not wait. for _, srv := range cd.services { srv.done <- struct{}{} } cd.srvsDone <- struct{}{} // Terminate Run. cd.runDone <- struct{}{} log.Debugf("Consul service discovery for %s stopped.", cd.clientConf.Address) } // watchServices retrieves updates from Consul's services endpoint and sends // potential updates to the update channel. func (cd *ConsulDiscovery) watchServices(update chan<- *consulService) { var lastIndex uint64 for { catalog := cd.client.Catalog() srvs, meta, err := catalog.Services(&consul.QueryOptions{ WaitIndex: lastIndex, WaitTime: consulWatchTimeout, }) if err != nil { log.Errorf("Error refreshing service list: %s", err) <-time.After(consulRetryInterval) continue } // If the index equals the previous one, the watch timed out with no update. if meta.LastIndex == lastIndex { continue } lastIndex = meta.LastIndex cd.mu.Lock() select { case <-cd.srvsDone: cd.mu.Unlock() return default: // Continue. } // Check for new services. for name := range srvs { if _, ok := cd.scrapedServices[name]; !ok { continue } srv, ok := cd.services[name] if !ok { srv = &consulService{ name: name, tgroup: &config.TargetGroup{}, done: make(chan struct{}, 1), } srv.tgroup.Source = name cd.services[name] = srv } srv.tgroup.Labels = clientmodel.LabelSet{ ConsulServiceLabel: clientmodel.LabelValue(name), ConsulDCLabel: clientmodel.LabelValue(cd.clientConf.Datacenter), } update <- srv } // Check for removed services. for name, srv := range cd.services { if _, ok := srvs[name]; !ok { srv.removed = true update <- srv srv.done <- struct{}{} delete(cd.services, name) } } cd.mu.Unlock() } } // watchService retrieves updates about srv from Consul's service endpoint. // On a potential update the resulting target group is sent to ch. func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) { catalog := cd.client.Catalog() for { nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{ WaitIndex: srv.lastIndex, WaitTime: consulWatchTimeout, }) if err != nil { log.Errorf("Error refreshing service %s: %s", srv.name, err) <-time.After(consulRetryInterval) continue } // If the index equals the previous one, the watch timed out with no update. if meta.LastIndex == srv.lastIndex { continue } srv.lastIndex = meta.LastIndex srv.tgroup.Targets = make([]clientmodel.LabelSet, 0, len(nodes)) for _, node := range nodes { addr := fmt.Sprintf("%s:%d", node.Address, node.ServicePort) // We surround the separated list with the separator as well. This way regular expressions // in relabeling rules don't have to consider tag positions. tags := cd.tagSeparator + strings.Join(node.ServiceTags, cd.tagSeparator) + cd.tagSeparator srv.tgroup.Targets = append(srv.tgroup.Targets, clientmodel.LabelSet{ clientmodel.AddressLabel: clientmodel.LabelValue(addr), ConsulAddressLabel: clientmodel.LabelValue(node.Address), ConsulNodeLabel: clientmodel.LabelValue(node.Node), ConsulTagsLabel: clientmodel.LabelValue(tags), ConsulServiceAddressLabel: clientmodel.LabelValue(node.ServiceAddress), ConsulServicePortLabel: clientmodel.LabelValue(strconv.Itoa(node.ServicePort)), }) } cd.mu.Lock() select { case <-srv.done: cd.mu.Unlock() return default: // Continue. } ch <- srv.tgroup cd.mu.Unlock() } }