diff --git a/retrieval/discovery/azure.go b/retrieval/discovery/azure.go index 2153505a5e..cec54af86e 100644 --- a/retrieval/discovery/azure.go +++ b/retrieval/discovery/azure.go @@ -81,7 +81,6 @@ func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { // Run implements the TargetProvider interface. func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) ticker := time.NewTicker(ad.interval) defer ticker.Stop() @@ -96,7 +95,10 @@ func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGro if err != nil { log.Errorf("unable to refresh during Azure discovery: %s", err) } else { - ch <- []*config.TargetGroup{tg} + select { + case <-ctx.Done(): + case ch <- []*config.TargetGroup{tg}: + } } select { diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 114a1ece4e..e81749b1cd 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -18,7 +18,6 @@ import ( "net" "strconv" "strings" - "sync" "time" consul "github.com/hashicorp/consul/api" @@ -133,12 +132,6 @@ func (cd *Discovery) shouldWatch(name string) bool { // Run implements the TargetProvider interface. func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - var wg sync.WaitGroup - defer func() { - wg.Wait() - close(ch) - }() - // Watched services and their cancelation functions. services := map[string]func(){} @@ -204,11 +197,7 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } wctx, cancel := context.WithCancel(ctx) - wg.Add(1) - go func() { - srv.watch(wctx, ch) - wg.Done() - }() + srv.watch(wctx, ch) services[name] = cancel } diff --git a/retrieval/discovery/dns/dns.go b/retrieval/discovery/dns/dns.go index b0c9aca971..ca9ed56fc0 100644 --- a/retrieval/discovery/dns/dns.go +++ b/retrieval/discovery/dns/dns.go @@ -90,8 +90,6 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery { // Run implements the TargetProvider interface. func (dd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - ticker := time.NewTicker(dd.interval) defer ticker.Stop() diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 65aa7bac06..1d3d8e112e 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -93,8 +93,6 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { // Run implements the TargetProvider interface. func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - ticker := time.NewTicker(ed.interval) defer ticker.Stop() @@ -103,7 +101,11 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup if err != nil { log.Error(err) } else { - ch <- []*config.TargetGroup{tg} + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } } for { @@ -112,8 +114,13 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup tg, err := ed.refresh() if err != nil { log.Error(err) - } else { - ch <- []*config.TargetGroup{tg} + continue + } + + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return } case <-ctx.Done(): return diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index ba7b9be916..ba1ae5b0cf 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -109,7 +109,6 @@ func (fd *FileDiscovery) watchFiles() { // Run implements the TargetProvider interface. func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) defer fd.stop() watcher, err := fsnotify.NewWatcher() @@ -119,47 +118,40 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou } fd.watcher = watcher - fd.refresh(ch) + fd.refresh(ctx, ch) ticker := time.NewTicker(fd.interval) defer ticker.Stop() for { - // Stopping has priority over refreshing. Thus we wrap the actual select - // clause to always catch done signals. select { case <-ctx.Done(): return - default: - select { - case <-ctx.Done(): - return - case event := <-fd.watcher.Events: - // fsnotify sometimes sends a bunch of events without name or operation. - // It's unclear what they are and why they are sent - filter them out. - if len(event.Name) == 0 { - break - } - // Everything but a chmod requires rereading. - if event.Op^fsnotify.Chmod == 0 { - break - } - // Changes to a file can spawn various sequences of events with - // different combinations of operations. For all practical purposes - // this is inaccurate. - // The most reliable solution is to reload everything if anything happens. - fd.refresh(ch) + case event := <-fd.watcher.Events: + // fsnotify sometimes sends a bunch of events without name or operation. + // It's unclear what they are and why they are sent - filter them out. + if len(event.Name) == 0 { + break + } + // Everything but a chmod requires rereading. + if event.Op^fsnotify.Chmod == 0 { + break + } + // Changes to a file can spawn various sequences of events with + // different combinations of operations. For all practical purposes + // this is inaccurate. + // The most reliable solution is to reload everything if anything happens. + fd.refresh(ctx, ch) - case <-ticker.C: - // Setting a new watch after an update might fail. Make sure we don't lose - // those files forever. - fd.refresh(ch) + case <-ticker.C: + // Setting a new watch after an update might fail. Make sure we don't lose + // those files forever. + fd.refresh(ctx, ch) - case err := <-fd.watcher.Errors: - if err != nil { - log.Errorf("Error on file watch: %s", err) - } + case err := <-fd.watcher.Errors: + if err != nil { + log.Errorf("Error on file watch: %s", err) } } } @@ -193,7 +185,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { +func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) { t0 := time.Now() defer func() { fileSDScanDuration.Observe(time.Since(t0).Seconds()) @@ -209,7 +201,11 @@ func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { ref[p] = fd.lastRefresh[p] continue } - ch <- tgroups + select { + case ch <- tgroups: + case <-ctx.Done(): + return + } ref[p] = len(tgroups) } @@ -218,8 +214,10 @@ func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- []*config.TargetGroup{ - {Source: fileSource(f, i)}, + select { + case ch <- []*config.TargetGroup{{Source: fileSource(f, i)}}: + case <-ctx.Done(): + return } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 10ff49dbab..9751890679 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -106,11 +106,17 @@ retry: // not try to make sense of it all... drained := make(chan struct{}) go func() { - for tgs := range ch { - // Below we will change the file to a bad syntax. Previously extracted target - // groups must not be deleted via sending an empty target group. - if len(tgs[0].Targets) == 0 { - t.Errorf("Unexpected empty target groups received: %s", tgs) + Loop: + for { + select { + case tgs := <-ch: + // Below we will change the file to a bad syntax. Previously extracted target + // groups must not be deleted via sending an empty target group. + if len(tgs[0].Targets) == 0 { + t.Errorf("Unexpected empty target groups received: %s", tgs) + } + case <-time.After(500 * time.Millisecond): + break Loop } } close(drained) diff --git a/retrieval/discovery/gce.go b/retrieval/discovery/gce.go index d098c5608f..c0f8e311df 100644 --- a/retrieval/discovery/gce.go +++ b/retrieval/discovery/gce.go @@ -108,14 +108,15 @@ func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { // Run implements the TargetProvider interface. func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - // Get an initial set right away. tg, err := gd.refresh() if err != nil { log.Error(err) } else { - ch <- []*config.TargetGroup{tg} + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + } } ticker := time.NewTicker(gd.interval) @@ -127,8 +128,11 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup tg, err := gd.refresh() if err != nil { log.Error(err) - } else { - ch <- []*config.TargetGroup{tg} + continue + } + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): } case <-ctx.Done(): return diff --git a/retrieval/discovery/kubernetes/kubernetes.go b/retrieval/discovery/kubernetes/kubernetes.go index b289c2a98c..3d2d7a54d0 100644 --- a/retrieval/discovery/kubernetes/kubernetes.go +++ b/retrieval/discovery/kubernetes/kubernetes.go @@ -107,8 +107,6 @@ const resyncPeriod = 10 * time.Minute // Run implements the TargetProvider interface. func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - rclient := k.client.Core().GetRESTClient() switch k.role { diff --git a/retrieval/discovery/marathon/marathon.go b/retrieval/discovery/marathon/marathon.go index 5e97763a11..6f9668b668 100644 --- a/retrieval/discovery/marathon/marathon.go +++ b/retrieval/discovery/marathon/marathon.go @@ -103,8 +103,6 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { // Run implements the TargetProvider interface. func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - for { select { case <-ctx.Done():