diff --git a/discovery/azure/azure.go b/discovery/azure/azure.go index c0f354230c..34ee5de4de 100644 --- a/discovery/azure/azure.go +++ b/discovery/azure/azure.go @@ -60,17 +60,17 @@ func init() { prometheus.MustRegister(azureSDRefreshFailuresCount) } -// AzureDiscovery periodically performs Azure-SD requests. It implements +// Discovery periodically performs Azure-SD requests. It implements // the TargetProvider interface. -type AzureDiscovery struct { +type Discovery struct { cfg *config.AzureSDConfig interval time.Duration port int } // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. -func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { - return &AzureDiscovery{ +func NewDiscovery(cfg *config.AzureSDConfig) *Discovery { + return &Discovery{ cfg: cfg, interval: time.Duration(cfg.RefreshInterval), port: cfg.Port, @@ -78,8 +78,8 @@ func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { } // Run implements the TargetProvider interface. -func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - ticker := time.NewTicker(ad.interval) +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + ticker := time.NewTicker(d.interval) defer ticker.Stop() for { @@ -89,7 +89,7 @@ func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGro default: } - tg, err := ad.refresh() + tg, err := d.refresh() if err != nil { log.Errorf("unable to refresh during Azure discovery: %s", err) } else { @@ -156,7 +156,7 @@ func newAzureResourceFromID(id string) (azureResource, error) { }, nil } -func (ad *AzureDiscovery) refresh() (tg *config.TargetGroup, err error) { +func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { t0 := time.Now() defer func() { azureSDRefreshDuration.Observe(time.Since(t0).Seconds()) @@ -165,7 +165,7 @@ func (ad *AzureDiscovery) refresh() (tg *config.TargetGroup, err error) { } }() tg = &config.TargetGroup{} - client, err := createAzureClient(*ad.cfg) + client, err := createAzureClient(*d.cfg) if err != nil { return tg, fmt.Errorf("could not create Azure client: %s", err) } @@ -246,7 +246,7 @@ func (ad *AzureDiscovery) refresh() (tg *config.TargetGroup, err error) { for _, ip := range *networkInterface.Properties.IPConfigurations { if ip.Properties.PrivateIPAddress != nil { labels[azureLabelMachinePrivateIP] = model.LabelValue(*ip.Properties.PrivateIPAddress) - address := net.JoinHostPort(*ip.Properties.PrivateIPAddress, fmt.Sprintf("%d", ad.port)) + address := net.JoinHostPort(*ip.Properties.PrivateIPAddress, fmt.Sprintf("%d", d.port)) labels[model.AddressLabel] = model.LabelValue(address) ch <- target{labelSet: labels, err: nil} return diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 05eac61e84..2f487dfdde 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -117,12 +117,12 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { } // shouldWatch returns whether the service of the given name should be watched. -func (cd *Discovery) shouldWatch(name string) bool { +func (d *Discovery) shouldWatch(name string) bool { // If there's no fixed set of watched services, we watch everything. - if len(cd.watchedServices) == 0 { + if len(d.watchedServices) == 0 { return true } - for _, sn := range cd.watchedServices { + for _, sn := range d.watchedServices { if sn == name { return true } @@ -131,13 +131,13 @@ func (cd *Discovery) shouldWatch(name string) bool { } // Run implements the TargetProvider interface. -func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Watched services and their cancelation functions. services := map[string]func(){} var lastIndex uint64 for { - catalog := cd.client.Catalog() + catalog := d.client.Catalog() t0 := time.Now() srvs, meta, err := catalog.Services(&consul.QueryOptions{ WaitIndex: lastIndex, @@ -167,19 +167,19 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // If the datacenter was not set from clientConf, let's get it from the local Consul agent // (Consul default is to use local node's datacenter if one isn't given for a query). - if cd.clientDatacenter == "" { - info, err := cd.client.Agent().Self() + if d.clientDatacenter == "" { + info, err := d.client.Agent().Self() if err != nil { log.Errorf("Error retrieving datacenter name: %s", err) time.Sleep(retryInterval) continue } - cd.clientDatacenter = info["Config"]["Datacenter"].(string) + d.clientDatacenter = info["Config"]["Datacenter"].(string) } // Check for new services. for name := range srvs { - if !cd.shouldWatch(name) { + if !d.shouldWatch(name) { continue } if _, ok := services[name]; ok { @@ -187,13 +187,13 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } srv := &consulService{ - client: cd.client, + client: d.client, name: name, labels: model.LabelSet{ serviceLabel: model.LabelValue(name), - datacenterLabel: model.LabelValue(cd.clientDatacenter), + datacenterLabel: model.LabelValue(d.clientDatacenter), }, - tagSeparator: cd.tagSeparator, + tagSeparator: d.tagSeparator, } wctx, cancel := context.WithCancel(ctx) diff --git a/discovery/dns/dns.go b/discovery/dns/dns.go index 9890de4aee..d5d506dedc 100644 --- a/discovery/dns/dns.go +++ b/discovery/dns/dns.go @@ -64,7 +64,6 @@ type Discovery struct { names []string interval time.Duration - m sync.RWMutex port int qtype uint16 } @@ -89,30 +88,30 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery { } // Run implements the TargetProvider interface. -func (dd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - ticker := time.NewTicker(dd.interval) +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + ticker := time.NewTicker(d.interval) defer ticker.Stop() // Get an initial set right away. - dd.refreshAll(ctx, ch) + d.refreshAll(ctx, ch) for { select { case <-ticker.C: - dd.refreshAll(ctx, ch) + d.refreshAll(ctx, ch) case <-ctx.Done(): return } } } -func (dd *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGroup) { +func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGroup) { var wg sync.WaitGroup - wg.Add(len(dd.names)) - for _, name := range dd.names { + wg.Add(len(d.names)) + for _, name := range d.names { go func(n string) { - if err := dd.refresh(ctx, n, ch); err != nil { + if err := d.refresh(ctx, n, ch); err != nil { log.Errorf("Error refreshing DNS targets: %s", err) } wg.Done() @@ -122,8 +121,8 @@ func (dd *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetG wg.Wait() } -func (dd *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error { - response, err := lookupAll(name, dd.qtype) +func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error { + response, err := lookupAll(name, d.qtype) dnsSDLookupsCount.Inc() if err != nil { dnsSDLookupFailuresCount.Inc() @@ -144,9 +143,9 @@ func (dd *Discovery) refresh(ctx context.Context, name string, ch chan<- []*conf target = hostPort(addr.Target, int(addr.Port)) case *dns.A: - target = hostPort(addr.A.String(), dd.port) + target = hostPort(addr.A.String(), d.port) case *dns.AAAA: - target = hostPort(addr.AAAA.String(), dd.port) + target = hostPort(addr.AAAA.String(), d.port) default: log.Warnf("%q is not a valid SRV record", record) continue diff --git a/discovery/ec2/ec2.go b/discovery/ec2/ec2.go index 2d4138be3d..56fa07f9a3 100644 --- a/discovery/ec2/ec2.go +++ b/discovery/ec2/ec2.go @@ -65,9 +65,9 @@ func init() { prometheus.MustRegister(ec2SDRefreshDuration) } -// EC2Discovery periodically performs EC2-SD requests. It implements +// Discovery periodically performs EC2-SD requests. It implements // the TargetProvider interface. -type EC2Discovery struct { +type Discovery struct { aws *aws.Config interval time.Duration profile string @@ -75,12 +75,12 @@ type EC2Discovery struct { } // NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery { +func NewDiscovery(conf *config.EC2SDConfig) *Discovery { creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "") if conf.AccessKey == "" && conf.SecretKey == "" { creds = nil } - return &EC2Discovery{ + return &Discovery{ aws: &aws.Config{ Region: &conf.Region, Credentials: creds, @@ -92,12 +92,12 @@ func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery { } // Run implements the TargetProvider interface. -func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - ticker := time.NewTicker(ed.interval) +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + ticker := time.NewTicker(d.interval) defer ticker.Stop() // Get an initial set right away. - tg, err := ed.refresh() + tg, err := d.refresh() if err != nil { log.Error(err) } else { @@ -111,7 +111,7 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup for { select { case <-ticker.C: - tg, err := ed.refresh() + tg, err := d.refresh() if err != nil { log.Error(err) continue @@ -128,7 +128,7 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup } } -func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) { +func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { t0 := time.Now() defer func() { ec2SDRefreshDuration.Observe(time.Since(t0).Seconds()) @@ -138,8 +138,8 @@ func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) { }() sess, err := session.NewSessionWithOptions(session.Options{ - Config: *ed.aws, - Profile: ed.profile, + Config: *d.aws, + Profile: d.profile, }) if err != nil { return nil, fmt.Errorf("could not create aws session: %s", err) @@ -147,7 +147,7 @@ func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) { ec2s := ec2.New(sess) tg = &config.TargetGroup{ - Source: *ed.aws.Region, + Source: *d.aws.Region, } if err = ec2s.DescribeInstancesPages(nil, func(p *ec2.DescribeInstancesOutput, lastPage bool) bool { for _, r := range p.Reservations { @@ -159,7 +159,7 @@ func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) { ec2LabelInstanceID: model.LabelValue(*inst.InstanceId), } labels[ec2LabelPrivateIP] = model.LabelValue(*inst.PrivateIpAddress) - addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", ed.port)) + addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", d.port)) labels[model.AddressLabel] = model.LabelValue(addr) if inst.PublicIpAddress != nil { diff --git a/discovery/file/file.go b/discovery/file/file.go index bcfa4db871..7e4d11eb42 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -51,10 +51,10 @@ func init() { prometheus.MustRegister(fileSDReadErrorsCount) } -// FileDiscovery provides service discovery functionality based +// Discovery provides service discovery functionality based // on files that contain target groups in JSON or YAML format. Refreshing // happens using file watches and periodic refreshes. -type FileDiscovery struct { +type Discovery struct { paths []string watcher *fsnotify.Watcher interval time.Duration @@ -66,17 +66,17 @@ type FileDiscovery struct { } // NewDiscovery returns a new file discovery for the given paths. -func NewDiscovery(conf *config.FileSDConfig) *FileDiscovery { - return &FileDiscovery{ +func NewDiscovery(conf *config.FileSDConfig) *Discovery { + return &Discovery{ paths: conf.Files, interval: time.Duration(conf.RefreshInterval), } } // listFiles returns a list of all files that match the configured patterns. -func (fd *FileDiscovery) listFiles() []string { +func (d *Discovery) listFiles() []string { var paths []string - for _, p := range fd.paths { + for _, p := range d.paths { files, err := filepath.Glob(p) if err != nil { log.Errorf("Error expanding glob %q: %s", p, err) @@ -89,36 +89,36 @@ func (fd *FileDiscovery) listFiles() []string { // watchFiles sets watches on all full paths or directories that were configured for // this file discovery. -func (fd *FileDiscovery) watchFiles() { - if fd.watcher == nil { +func (d *Discovery) watchFiles() { + if d.watcher == nil { panic("no watcher configured") } - for _, p := range fd.paths { + for _, p := range d.paths { if idx := strings.LastIndex(p, "/"); idx > -1 { p = p[:idx] } else { p = "./" } - if err := fd.watcher.Add(p); err != nil { + if err := d.watcher.Add(p); err != nil { log.Errorf("Error adding file watch for %q: %s", p, err) } } } // Run implements the TargetProvider interface. -func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer fd.stop() +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + defer d.stop() watcher, err := fsnotify.NewWatcher() if err != nil { log.Errorf("Error creating file watcher: %s", err) return } - fd.watcher = watcher + d.watcher = watcher - fd.refresh(ctx, ch) + d.refresh(ctx, ch) - ticker := time.NewTicker(fd.interval) + ticker := time.NewTicker(d.interval) defer ticker.Stop() for { @@ -126,7 +126,7 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou case <-ctx.Done(): return - case event := <-fd.watcher.Events: + case event := <-d.watcher.Events: // fsnotify sometimes sends a bunch of events without name or operation. // It's unclear what they are and why they are sent - filter them out. if len(event.Name) == 0 { @@ -140,14 +140,14 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou // different combinations of operations. For all practical purposes // this is inaccurate. // The most reliable solution is to reload everything if anything happens. - fd.refresh(ctx, ch) + d.refresh(ctx, ch) case <-ticker.C: // Setting a new watch after an update might fail. Make sure we don't lose // those files forever. - fd.refresh(ctx, ch) + d.refresh(ctx, ch) - case err := <-fd.watcher.Errors: + case err := <-d.watcher.Errors: if err != nil { log.Errorf("Error on file watch: %s", err) } @@ -156,8 +156,8 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou } // stop shuts down the file watcher. -func (fd *FileDiscovery) stop() { - log.Debugf("Stopping file discovery for %s...", fd.paths) +func (d *Discovery) stop() { + log.Debugf("Stopping file discovery for %s...", d.paths) done := make(chan struct{}) defer close(done) @@ -166,37 +166,37 @@ func (fd *FileDiscovery) stop() { go func() { for { select { - case <-fd.watcher.Errors: - case <-fd.watcher.Events: + case <-d.watcher.Errors: + case <-d.watcher.Events: // Drain all events and errors. case <-done: return } } }() - if err := fd.watcher.Close(); err != nil { - log.Errorf("Error closing file watcher for %s: %s", fd.paths, err) + if err := d.watcher.Close(); err != nil { + log.Errorf("Error closing file watcher for %s: %s", d.paths, err) } - log.Debugf("File discovery for %s stopped.", fd.paths) + log.Debugf("File discovery for %s stopped.", d.paths) } // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) { +func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) { t0 := time.Now() defer func() { fileSDScanDuration.Observe(time.Since(t0).Seconds()) }() ref := map[string]int{} - for _, p := range fd.listFiles() { + for _, p := range d.listFiles() { tgroups, err := readFile(p) if err != nil { fileSDReadErrorsCount.Inc() log.Errorf("Error reading file %q: %s", p, err) // Prevent deletion down below. - ref[p] = fd.lastRefresh[p] + ref[p] = d.lastRefresh[p] continue } select { @@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.Target ref[p] = len(tgroups) } // Send empty updates for sources that disappeared. - for f, n := range fd.lastRefresh { + for f, n := range d.lastRefresh { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { @@ -220,9 +220,9 @@ func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.Target } } } - fd.lastRefresh = ref + d.lastRefresh = ref - fd.watchFiles() + d.watchFiles() } // fileSource returns a source ID for the i-th target group in the file. diff --git a/discovery/gce/gce.go b/discovery/gce/gce.go index 1161e4e4b0..44ac349fea 100644 --- a/discovery/gce/gce.go +++ b/discovery/gce/gce.go @@ -44,9 +44,6 @@ const ( gceLabelInstanceStatus = gceLabel + "instance_status" gceLabelTags = gceLabel + "tags" gceLabelMetadata = gceLabel + "metadata_" - - // Constants for instrumentation. - namespace = "prometheus" ) var ( @@ -67,9 +64,9 @@ func init() { prometheus.MustRegister(gceSDRefreshDuration) } -// GCEDiscovery periodically performs GCE-SD requests. It implements +// Discovery periodically performs GCE-SD requests. It implements // the TargetProvider interface. -type GCEDiscovery struct { +type Discovery struct { project string zone string filter string @@ -81,9 +78,9 @@ type GCEDiscovery struct { tagSeparator string } -// NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets. -func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { - gd := &GCEDiscovery{ +// NewDiscovery returns a new Discovery which periodically refreshes its targets. +func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) { + gd := &Discovery{ project: conf.Project, zone: conf.Zone, filter: conf.Filter, @@ -105,9 +102,9 @@ func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { } // Run implements the TargetProvider interface. -func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Get an initial set right away. - tg, err := gd.refresh() + tg, err := d.refresh() if err != nil { log.Error(err) } else { @@ -117,13 +114,13 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup } } - ticker := time.NewTicker(gd.interval) + ticker := time.NewTicker(d.interval) defer ticker.Stop() for { select { case <-ticker.C: - tg, err := gd.refresh() + tg, err := d.refresh() if err != nil { log.Error(err) continue @@ -138,7 +135,7 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup } } -func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) { +func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { t0 := time.Now() defer func() { gceSDRefreshDuration.Observe(time.Since(t0).Seconds()) @@ -148,12 +145,12 @@ func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) { }() tg = &config.TargetGroup{ - Source: fmt.Sprintf("GCE_%s_%s", gd.project, gd.zone), + Source: fmt.Sprintf("GCE_%s_%s", d.project, d.zone), } - ilc := gd.isvc.List(gd.project, gd.zone) - if len(gd.filter) > 0 { - ilc = ilc.Filter(gd.filter) + ilc := d.isvc.List(d.project, d.zone) + if len(d.filter) > 0 { + ilc = ilc.Filter(d.filter) } err = ilc.Pages(nil, func(l *compute.InstanceList) error { for _, inst := range l.Items { @@ -161,7 +158,7 @@ func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) { continue } labels := model.LabelSet{ - gceLabelProject: model.LabelValue(gd.project), + gceLabelProject: model.LabelValue(d.project), gceLabelZone: model.LabelValue(inst.Zone), gceLabelInstanceName: model.LabelValue(inst.Name), gceLabelInstanceStatus: model.LabelValue(inst.Status), @@ -170,14 +167,14 @@ func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) { labels[gceLabelNetwork] = model.LabelValue(priIface.Network) labels[gceLabelSubnetwork] = model.LabelValue(priIface.Subnetwork) labels[gceLabelPrivateIP] = model.LabelValue(priIface.NetworkIP) - addr := fmt.Sprintf("%s:%d", priIface.NetworkIP, gd.port) + addr := fmt.Sprintf("%s:%d", priIface.NetworkIP, d.port) labels[model.AddressLabel] = model.LabelValue(addr) // Tags in GCE are usually only used for networking rules. if inst.Tags != nil && len(inst.Tags.Items) > 0 { // We surround the separated list with the separator as well. This way regular expressions // in relabeling rules don't have to consider tag positions. - tags := gd.tagSeparator + strings.Join(inst.Tags.Items, gd.tagSeparator) + gd.tagSeparator + tags := d.tagSeparator + strings.Join(inst.Tags.Items, d.tagSeparator) + d.tagSeparator labels[gceLabelTags] = model.LabelValue(tags) } diff --git a/discovery/kubernetes/endpoints_test.go b/discovery/kubernetes/endpoints_test.go index 296249f7e7..de77dafc81 100644 --- a/discovery/kubernetes/endpoints_test.go +++ b/discovery/kubernetes/endpoints_test.go @@ -45,33 +45,33 @@ func makeEndpoints() *v1.Endpoints { Namespace: "default", }, Subsets: []v1.EndpointSubset{ - v1.EndpointSubset{ + { Addresses: []v1.EndpointAddress{ - v1.EndpointAddress{ + { IP: "1.2.3.4", }, }, Ports: []v1.EndpointPort{ - v1.EndpointPort{ + { Name: "testport", Port: 9000, Protocol: v1.ProtocolTCP, }, }, }, - v1.EndpointSubset{ + { Addresses: []v1.EndpointAddress{ - v1.EndpointAddress{ + { IP: "2.3.4.5", }, }, NotReadyAddresses: []v1.EndpointAddress{ - v1.EndpointAddress{ + { IP: "2.3.4.5", }, }, Ports: []v1.EndpointPort{ - v1.EndpointPort{ + { Name: "testport", Port: 9001, Protocol: v1.ProtocolTCP, @@ -89,21 +89,21 @@ func TestEndpointsDiscoveryInitial(t *testing.T) { k8sDiscoveryTest{ discovery: n, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_ready": "true", }, - model.LabelSet{ + { "__address__": "2.3.4.5:9001", "__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_ready": "true", }, - model.LabelSet{ + { "__address__": "2.3.4.5:9001", "__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_protocol": "TCP", @@ -130,20 +130,20 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { Spec: v1.PodSpec{ NodeName: "testnode", Containers: []v1.Container{ - v1.Container{ + { Name: "c1", Ports: []v1.ContainerPort{ - v1.ContainerPort{ + { Name: "mainport", ContainerPort: 9000, Protocol: v1.ProtocolTCP, }, }, }, - v1.Container{ + { Name: "c2", Ports: []v1.ContainerPort{ - v1.ContainerPort{ + { Name: "sideport", ContainerPort: 9001, Protocol: v1.ProtocolTCP, @@ -169,9 +169,9 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { Namespace: "default", }, Subsets: []v1.EndpointSubset{ - v1.EndpointSubset{ + { Addresses: []v1.EndpointAddress{ - v1.EndpointAddress{ + { IP: "4.3.2.1", TargetRef: &v1.ObjectReference{ Kind: "Pod", @@ -181,7 +181,7 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { }, }, Ports: []v1.EndpointPort{ - v1.EndpointPort{ + { Name: "testport", Port: 9000, Protocol: v1.ProtocolTCP, @@ -194,9 +194,9 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "4.3.2.1:9000", "__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_protocol": "TCP", @@ -211,7 +211,7 @@ func TestEndpointsDiscoveryAdd(t *testing.T) { "__meta_kubernetes_pod_container_port_number": "9000", "__meta_kubernetes_pod_container_port_protocol": "TCP", }, - model.LabelSet{ + { "__address__": "1.2.3.4:9001", "__meta_kubernetes_pod_name": "testpod", "__meta_kubernetes_pod_ip": "1.2.3.4", @@ -242,7 +242,7 @@ func TestEndpointsDiscoveryDelete(t *testing.T) { discovery: n, afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "endpoints/default/testendpoints", }, }, @@ -257,7 +257,7 @@ func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) { discovery: n, afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "endpoints/default/testendpoints", }, }, @@ -278,28 +278,28 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) { Namespace: "default", }, Subsets: []v1.EndpointSubset{ - v1.EndpointSubset{ + { Addresses: []v1.EndpointAddress{ - v1.EndpointAddress{ + { IP: "1.2.3.4", }, }, Ports: []v1.EndpointPort{ - v1.EndpointPort{ + { Name: "testport", Port: 9000, Protocol: v1.ProtocolTCP, }, }, }, - v1.EndpointSubset{ + { Addresses: []v1.EndpointAddress{ - v1.EndpointAddress{ + { IP: "2.3.4.5", }, }, Ports: []v1.EndpointPort{ - v1.EndpointPort{ + { Name: "testport", Port: 9001, Protocol: v1.ProtocolTCP, @@ -311,15 +311,15 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) { }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_ready": "true", }, - model.LabelSet{ + { "__address__": "2.3.4.5:9001", "__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_protocol": "TCP", diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 768e649781..8e06b83626 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -59,9 +59,9 @@ func init() { } } -// Kubernetes implements the TargetProvider interface for discovering +// Discovery implements the TargetProvider interface for discovering // targets from Kubernetes. -type Kubernetes struct { +type Discovery struct { client kubernetes.Interface role config.KubernetesRole logger log.Logger @@ -76,7 +76,7 @@ func init() { } // New creates a new Kubernetes discovery for the given role. -func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) { +func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) { var ( kcfg *rest.Config err error @@ -136,7 +136,7 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) { if err != nil { return nil, err } - return &Kubernetes{ + return &Discovery{ client: c, logger: l, role: conf.Role, @@ -146,16 +146,16 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) { const resyncPeriod = 10 * time.Minute // Run implements the TargetProvider interface. -func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - rclient := k.client.Core().GetRESTClient() +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { + rclient := d.client.Core().GetRESTClient() - switch k.role { + switch d.role { case "endpoints": elw := cache.NewListWatchFromClient(rclient, "endpoints", api.NamespaceAll, nil) slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) eps := NewEndpoints( - k.logger.With("kubernetes_sd", "endpoint"), + d.logger.With("kubernetes_sd", "endpoint"), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), @@ -178,7 +178,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case "pod": plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) pod := NewPod( - k.logger.With("kubernetes_sd", "pod"), + d.logger.With("kubernetes_sd", "pod"), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), ) go pod.informer.Run(ctx.Done()) @@ -191,7 +191,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case "service": slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) svc := NewService( - k.logger.With("kubernetes_sd", "service"), + d.logger.With("kubernetes_sd", "service"), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), ) go svc.informer.Run(ctx.Done()) @@ -204,7 +204,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case "node": nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil) node := NewNode( - k.logger.With("kubernetes_sd", "node"), + d.logger.With("kubernetes_sd", "node"), cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod), ) go node.informer.Run(ctx.Done()) @@ -215,7 +215,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { node.Run(ctx, ch) default: - k.logger.Errorf("unknown Kubernetes discovery kind %q", k.role) + d.logger.Errorf("unknown Kubernetes discovery kind %q", d.role) } <-ctx.Done() diff --git a/discovery/kubernetes/node_test.go b/discovery/kubernetes/node_test.go index 9f78f7b02a..16fa1558f8 100644 --- a/discovery/kubernetes/node_test.go +++ b/discovery/kubernetes/node_test.go @@ -167,7 +167,7 @@ func makeNode(name, address string, labels map[string]string, annotations map[st }, Status: v1.NodeStatus{ Addresses: []v1.NodeAddress{ - v1.NodeAddress{ + { Type: v1.NodeInternalIP, Address: address, }, @@ -197,9 +197,9 @@ func TestNodeDiscoveryInitial(t *testing.T) { k8sDiscoveryTest{ discovery: n, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:10250", "instance": "test", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", @@ -223,9 +223,9 @@ func TestNodeDiscoveryAdd(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Add(makeEnumeratedNode(1)) }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:10250", "instance": "test1", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", @@ -248,9 +248,9 @@ func TestNodeDiscoveryDelete(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Delete(makeEnumeratedNode(0)) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:10250", "instance": "test0", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", @@ -263,7 +263,7 @@ func TestNodeDiscoveryDelete(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "node/test0", }, }, @@ -278,9 +278,9 @@ func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:10250", "instance": "test0", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", @@ -293,7 +293,7 @@ func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "node/test0", }, }, @@ -319,9 +319,9 @@ func TestNodeDiscoveryUpdate(t *testing.T) { }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:10250", "instance": "test0", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", @@ -334,9 +334,9 @@ func TestNodeDiscoveryUpdate(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:10250", "instance": "test0", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4", diff --git a/discovery/kubernetes/pod_test.go b/discovery/kubernetes/pod_test.go index c683e825b6..2b6ab3b895 100644 --- a/discovery/kubernetes/pod_test.go +++ b/discovery/kubernetes/pod_test.go @@ -47,22 +47,22 @@ func makeMultiPortPod() *v1.Pod { Spec: v1.PodSpec{ NodeName: "testnode", Containers: []v1.Container{ - v1.Container{ + { Name: "testcontainer0", Ports: []v1.ContainerPort{ - v1.ContainerPort{ + { Name: "testport0", Protocol: v1.ProtocolTCP, ContainerPort: int32(9000), }, - v1.ContainerPort{ + { Name: "testport1", Protocol: v1.ProtocolUDP, ContainerPort: int32(9001), }, }, }, - v1.Container{ + { Name: "testcontainer1", }, }, @@ -71,7 +71,7 @@ func makeMultiPortPod() *v1.Pod { PodIP: "1.2.3.4", HostIP: "2.3.4.5", Conditions: []v1.PodCondition{ - v1.PodCondition{ + { Type: v1.PodReady, Status: v1.ConditionTrue, }, @@ -89,10 +89,10 @@ func makePod() *v1.Pod { Spec: v1.PodSpec{ NodeName: "testnode", Containers: []v1.Container{ - v1.Container{ + { Name: "testcontainer", Ports: []v1.ContainerPort{ - v1.ContainerPort{ + { Name: "testport", Protocol: v1.ProtocolTCP, ContainerPort: int32(9000), @@ -105,7 +105,7 @@ func makePod() *v1.Pod { PodIP: "1.2.3.4", HostIP: "2.3.4.5", Conditions: []v1.PodCondition{ - v1.PodCondition{ + { Type: v1.PodReady, Status: v1.ConditionTrue, }, @@ -121,23 +121,23 @@ func TestPodDiscoveryInitial(t *testing.T) { k8sDiscoveryTest{ discovery: n, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_pod_container_name": "testcontainer0", "__meta_kubernetes_pod_container_port_name": "testport0", "__meta_kubernetes_pod_container_port_number": "9000", "__meta_kubernetes_pod_container_port_protocol": "TCP", }, - model.LabelSet{ + { "__address__": "1.2.3.4:9001", "__meta_kubernetes_pod_container_name": "testcontainer0", "__meta_kubernetes_pod_container_port_name": "testport1", "__meta_kubernetes_pod_container_port_number": "9001", "__meta_kubernetes_pod_container_port_protocol": "UDP", }, - model.LabelSet{ + { "__address__": "1.2.3.4", "__meta_kubernetes_pod_container_name": "testcontainer1", }, @@ -165,9 +165,9 @@ func TestPodDiscoveryAdd(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Add(makePod()) }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_port_name": "testport", @@ -197,9 +197,9 @@ func TestPodDiscoveryDelete(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Delete(makePod()) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_port_name": "testport", @@ -219,7 +219,7 @@ func TestPodDiscoveryDelete(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "pod/default/testpod", }, }, @@ -234,9 +234,9 @@ func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_port_name": "testport", @@ -256,7 +256,7 @@ func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "pod/default/testpod", }, }, @@ -273,10 +273,10 @@ func TestPodDiscoveryUpdate(t *testing.T) { Spec: v1.PodSpec{ NodeName: "testnode", Containers: []v1.Container{ - v1.Container{ + { Name: "testcontainer", Ports: []v1.ContainerPort{ - v1.ContainerPort{ + { Name: "testport", Protocol: v1.ProtocolTCP, ContainerPort: int32(9000), @@ -295,9 +295,9 @@ func TestPodDiscoveryUpdate(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Update(makePod()) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_port_name": "testport", @@ -317,9 +317,9 @@ func TestPodDiscoveryUpdate(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__address__": "1.2.3.4:9000", "__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_port_name": "testport", diff --git a/discovery/kubernetes/service_test.go b/discovery/kubernetes/service_test.go index fc4fce73b1..bfcf191717 100644 --- a/discovery/kubernetes/service_test.go +++ b/discovery/kubernetes/service_test.go @@ -47,12 +47,12 @@ func makeMultiPortService() *v1.Service { }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ - v1.ServicePort{ + { Name: "testport0", Protocol: v1.ProtocolTCP, Port: int32(30900), }, - v1.ServicePort{ + { Name: "testport1", Protocol: v1.ProtocolUDP, Port: int32(30901), @@ -70,7 +70,7 @@ func makeSuffixedService(suffix string) *v1.Service { }, Spec: v1.ServiceSpec{ Ports: []v1.ServicePort{ - v1.ServicePort{ + { Name: "testport", Protocol: v1.ProtocolTCP, Port: int32(30900), @@ -91,14 +91,14 @@ func TestServiceDiscoveryInitial(t *testing.T) { k8sDiscoveryTest{ discovery: n, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "TCP", "__address__": "testservice.default.svc:30900", "__meta_kubernetes_service_port_name": "testport0", }, - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "UDP", "__address__": "testservice.default.svc:30901", "__meta_kubernetes_service_port_name": "testport1", @@ -123,9 +123,9 @@ func TestServiceDiscoveryAdd(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Add(makeService()) }() }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "TCP", "__address__": "testservice.default.svc:30900", "__meta_kubernetes_service_port_name": "testport", @@ -149,9 +149,9 @@ func TestServiceDiscoveryDelete(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Delete(makeService()) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "TCP", "__address__": "testservice.default.svc:30900", "__meta_kubernetes_service_port_name": "testport", @@ -165,7 +165,7 @@ func TestServiceDiscoveryDelete(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "svc/default/testservice", }, }, @@ -180,9 +180,9 @@ func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "TCP", "__address__": "testservice.default.svc:30900", "__meta_kubernetes_service_port_name": "testport", @@ -196,7 +196,7 @@ func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Source: "svc/default/testservice", }, }, @@ -211,9 +211,9 @@ func TestServiceDiscoveryUpdate(t *testing.T) { discovery: n, afterStart: func() { go func() { i.Update(makeMultiPortService()) }() }, expectedInitial: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "TCP", "__address__": "testservice.default.svc:30900", "__meta_kubernetes_service_port_name": "testport", @@ -227,14 +227,14 @@ func TestServiceDiscoveryUpdate(t *testing.T) { }, }, expectedRes: []*config.TargetGroup{ - &config.TargetGroup{ + { Targets: []model.LabelSet{ - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "TCP", "__address__": "testservice.default.svc:30900", "__meta_kubernetes_service_port_name": "testport0", }, - model.LabelSet{ + { "__meta_kubernetes_service_port_protocol": "UDP", "__address__": "testservice.default.svc:30901", "__meta_kubernetes_service_port_name": "testport1", diff --git a/discovery/marathon/marathon.go b/discovery/marathon/marathon.go index 2b7db9f993..75db582bf2 100644 --- a/discovery/marathon/marathon.go +++ b/discovery/marathon/marathon.go @@ -81,7 +81,7 @@ type Discovery struct { token string } -// Initialize sets up the discovery for usage. +// NewDiscovery returns a new Marathon Discovery. func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { tls, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { @@ -114,13 +114,13 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { } // Run implements the TargetProvider interface. -func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { for { select { case <-ctx.Done(): return - case <-time.After(md.refreshInterval): - err := md.updateServices(ctx, ch) + case <-time.After(d.refreshInterval): + err := d.updateServices(ctx, ch) if err != nil { log.Errorf("Error while updating services: %s", err) } @@ -128,7 +128,7 @@ func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } } -func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.TargetGroup) (err error) { +func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.TargetGroup) (err error) { t0 := time.Now() defer func() { refreshDuration.Observe(time.Since(t0).Seconds()) @@ -137,7 +137,7 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar } }() - targetMap, err := md.fetchTargetGroups() + targetMap, err := d.fetchTargetGroups() if err != nil { return err } @@ -154,7 +154,7 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar } // Remove services which did disappear. - for source := range md.lastRefresh { + for source := range d.lastRefresh { _, ok := targetMap[source] if !ok { select { @@ -166,13 +166,13 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar } } - md.lastRefresh = targetMap + d.lastRefresh = targetMap return nil } -func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { - url := RandomAppsURL(md.servers) - apps, err := md.appsClient(md.client, url, md.token) +func (d *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { + url := RandomAppsURL(d.servers) + apps, err := d.appsClient(d.client, url, d.token) if err != nil { return nil, err } diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index eb11f0b98f..205eff7262 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -29,7 +29,9 @@ import ( "github.com/prometheus/prometheus/util/treecache" ) -type ZookeeperDiscovery struct { +// Discovery implements the TargetProvider interface for discovering +// targets from Zookeeper. +type Discovery struct { conn *zk.Conn sources map[string]*config.TargetGroup @@ -40,13 +42,13 @@ type ZookeeperDiscovery struct { parse func(data []byte, path string) (model.LabelSet, error) } -// NewNerveDiscovery returns a new NerveDiscovery for the given config. -func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery { +// NewNerveDiscovery returns a new Discovery for the given Nerve config. +func NewNerveDiscovery(conf *config.NerveSDConfig) *Discovery { return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) } -// NewServersetDiscovery returns a new ServersetDiscovery for the given config. -func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery { +// NewServersetDiscovery returns a new Discovery for the given serverset config. +func NewServersetDiscovery(conf *config.ServersetSDConfig) *Discovery { return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) } @@ -57,14 +59,14 @@ func NewDiscovery( timeout time.Duration, paths []string, pf func(data []byte, path string) (model.LabelSet, error), -) *ZookeeperDiscovery { - conn, _, err := zk.Connect(srvs, time.Duration(timeout)) +) *Discovery { + conn, _, err := zk.Connect(srvs, timeout) conn.SetLogger(treecache.ZookeeperLogger{}) if err != nil { return nil } updates := make(chan treecache.ZookeeperTreeCacheEvent) - sd := &ZookeeperDiscovery{ + sd := &Discovery{ conn: conn, updates: updates, sources: map[string]*config.TargetGroup{}, @@ -77,34 +79,34 @@ func NewDiscovery( } // Run implements the TargetProvider interface. -func (sd *ZookeeperDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { +func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { defer func() { - for _, tc := range sd.treeCaches { + for _, tc := range d.treeCaches { tc.Stop() } // Drain event channel in case the treecache leaks goroutines otherwise. - for range sd.updates { + for range d.updates { } - sd.conn.Close() + d.conn.Close() }() for { select { case <-ctx.Done(): - case event := <-sd.updates: + case event := <-d.updates: tg := &config.TargetGroup{ Source: event.Path, } if event.Data != nil { - labelSet, err := sd.parse(*event.Data, event.Path) + labelSet, err := d.parse(*event.Data, event.Path) if err == nil { tg.Targets = []model.LabelSet{labelSet} - sd.sources[event.Path] = tg + d.sources[event.Path] = tg } else { - delete(sd.sources, event.Path) + delete(d.sources, event.Path) } } else { - delete(sd.sources, event.Path) + delete(d.sources, event.Path) } select { case <-ctx.Done():