From b4d7ce1370fbd6af0786d28882f13fb5830b25a6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 18 Nov 2016 10:55:29 +0100 Subject: [PATCH] discovery: respect context cancellation everywhere This also removes closing of the target group channel everywhere as the contexts cancels across all stages and we don't care about draining all events once that happened. --- retrieval/discovery/azure.go | 6 +- retrieval/discovery/consul/consul.go | 13 +--- retrieval/discovery/dns/dns.go | 2 - retrieval/discovery/ec2.go | 17 +++-- retrieval/discovery/file.go | 68 ++++++++++---------- retrieval/discovery/file_test.go | 16 +++-- retrieval/discovery/gce.go | 14 ++-- retrieval/discovery/kubernetes/kubernetes.go | 2 - retrieval/discovery/marathon/marathon.go | 2 - 9 files changed, 70 insertions(+), 70 deletions(-) diff --git a/retrieval/discovery/azure.go b/retrieval/discovery/azure.go index 2153505a5e..cec54af86e 100644 --- a/retrieval/discovery/azure.go +++ b/retrieval/discovery/azure.go @@ -81,7 +81,6 @@ func NewAzureDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { // Run implements the TargetProvider interface. func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) ticker := time.NewTicker(ad.interval) defer ticker.Stop() @@ -96,7 +95,10 @@ func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGro if err != nil { log.Errorf("unable to refresh during Azure discovery: %s", err) } else { - ch <- []*config.TargetGroup{tg} + select { + case <-ctx.Done(): + case ch <- []*config.TargetGroup{tg}: + } } select { diff --git a/retrieval/discovery/consul/consul.go b/retrieval/discovery/consul/consul.go index 114a1ece4e..e81749b1cd 100644 --- a/retrieval/discovery/consul/consul.go +++ b/retrieval/discovery/consul/consul.go @@ -18,7 +18,6 @@ import ( "net" "strconv" "strings" - "sync" "time" consul "github.com/hashicorp/consul/api" @@ -133,12 +132,6 @@ func (cd *Discovery) shouldWatch(name string) bool { // Run implements the TargetProvider interface. func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - var wg sync.WaitGroup - defer func() { - wg.Wait() - close(ch) - }() - // Watched services and their cancelation functions. services := map[string]func(){} @@ -204,11 +197,7 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } wctx, cancel := context.WithCancel(ctx) - wg.Add(1) - go func() { - srv.watch(wctx, ch) - wg.Done() - }() + srv.watch(wctx, ch) services[name] = cancel } diff --git a/retrieval/discovery/dns/dns.go b/retrieval/discovery/dns/dns.go index b0c9aca971..ca9ed56fc0 100644 --- a/retrieval/discovery/dns/dns.go +++ b/retrieval/discovery/dns/dns.go @@ -90,8 +90,6 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery { // Run implements the TargetProvider interface. func (dd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - ticker := time.NewTicker(dd.interval) defer ticker.Stop() diff --git a/retrieval/discovery/ec2.go b/retrieval/discovery/ec2.go index 65aa7bac06..1d3d8e112e 100644 --- a/retrieval/discovery/ec2.go +++ b/retrieval/discovery/ec2.go @@ -93,8 +93,6 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery { // Run implements the TargetProvider interface. func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - ticker := time.NewTicker(ed.interval) defer ticker.Stop() @@ -103,7 +101,11 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup if err != nil { log.Error(err) } else { - ch <- []*config.TargetGroup{tg} + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } } for { @@ -112,8 +114,13 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup tg, err := ed.refresh() if err != nil { log.Error(err) - } else { - ch <- []*config.TargetGroup{tg} + continue + } + + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return } case <-ctx.Done(): return diff --git a/retrieval/discovery/file.go b/retrieval/discovery/file.go index ba7b9be916..ba1ae5b0cf 100644 --- a/retrieval/discovery/file.go +++ b/retrieval/discovery/file.go @@ -109,7 +109,6 @@ func (fd *FileDiscovery) watchFiles() { // Run implements the TargetProvider interface. func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) defer fd.stop() watcher, err := fsnotify.NewWatcher() @@ -119,47 +118,40 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou } fd.watcher = watcher - fd.refresh(ch) + fd.refresh(ctx, ch) ticker := time.NewTicker(fd.interval) defer ticker.Stop() for { - // Stopping has priority over refreshing. Thus we wrap the actual select - // clause to always catch done signals. select { case <-ctx.Done(): return - default: - select { - case <-ctx.Done(): - return - case event := <-fd.watcher.Events: - // fsnotify sometimes sends a bunch of events without name or operation. - // It's unclear what they are and why they are sent - filter them out. - if len(event.Name) == 0 { - break - } - // Everything but a chmod requires rereading. - if event.Op^fsnotify.Chmod == 0 { - break - } - // Changes to a file can spawn various sequences of events with - // different combinations of operations. For all practical purposes - // this is inaccurate. - // The most reliable solution is to reload everything if anything happens. - fd.refresh(ch) + case event := <-fd.watcher.Events: + // fsnotify sometimes sends a bunch of events without name or operation. + // It's unclear what they are and why they are sent - filter them out. + if len(event.Name) == 0 { + break + } + // Everything but a chmod requires rereading. + if event.Op^fsnotify.Chmod == 0 { + break + } + // Changes to a file can spawn various sequences of events with + // different combinations of operations. For all practical purposes + // this is inaccurate. + // The most reliable solution is to reload everything if anything happens. + fd.refresh(ctx, ch) - case <-ticker.C: - // Setting a new watch after an update might fail. Make sure we don't lose - // those files forever. - fd.refresh(ch) + case <-ticker.C: + // Setting a new watch after an update might fail. Make sure we don't lose + // those files forever. + fd.refresh(ctx, ch) - case err := <-fd.watcher.Errors: - if err != nil { - log.Errorf("Error on file watch: %s", err) - } + case err := <-fd.watcher.Errors: + if err != nil { + log.Errorf("Error on file watch: %s", err) } } } @@ -193,7 +185,7 @@ func (fd *FileDiscovery) stop() { // refresh reads all files matching the discovery's patterns and sends the respective // updated target groups through the channel. -func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { +func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) { t0 := time.Now() defer func() { fileSDScanDuration.Observe(time.Since(t0).Seconds()) @@ -209,7 +201,11 @@ func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { ref[p] = fd.lastRefresh[p] continue } - ch <- tgroups + select { + case ch <- tgroups: + case <-ctx.Done(): + return + } ref[p] = len(tgroups) } @@ -218,8 +214,10 @@ func (fd *FileDiscovery) refresh(ch chan<- []*config.TargetGroup) { m, ok := ref[f] if !ok || n > m { for i := m; i < n; i++ { - ch <- []*config.TargetGroup{ - {Source: fileSource(f, i)}, + select { + case ch <- []*config.TargetGroup{{Source: fileSource(f, i)}}: + case <-ctx.Done(): + return } } } diff --git a/retrieval/discovery/file_test.go b/retrieval/discovery/file_test.go index 10ff49dbab..9751890679 100644 --- a/retrieval/discovery/file_test.go +++ b/retrieval/discovery/file_test.go @@ -106,11 +106,17 @@ retry: // not try to make sense of it all... drained := make(chan struct{}) go func() { - for tgs := range ch { - // Below we will change the file to a bad syntax. Previously extracted target - // groups must not be deleted via sending an empty target group. - if len(tgs[0].Targets) == 0 { - t.Errorf("Unexpected empty target groups received: %s", tgs) + Loop: + for { + select { + case tgs := <-ch: + // Below we will change the file to a bad syntax. Previously extracted target + // groups must not be deleted via sending an empty target group. + if len(tgs[0].Targets) == 0 { + t.Errorf("Unexpected empty target groups received: %s", tgs) + } + case <-time.After(500 * time.Millisecond): + break Loop } } close(drained) diff --git a/retrieval/discovery/gce.go b/retrieval/discovery/gce.go index d098c5608f..c0f8e311df 100644 --- a/retrieval/discovery/gce.go +++ b/retrieval/discovery/gce.go @@ -108,14 +108,15 @@ func NewGCEDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { // Run implements the TargetProvider interface. func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - // Get an initial set right away. tg, err := gd.refresh() if err != nil { log.Error(err) } else { - ch <- []*config.TargetGroup{tg} + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + } } ticker := time.NewTicker(gd.interval) @@ -127,8 +128,11 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup tg, err := gd.refresh() if err != nil { log.Error(err) - } else { - ch <- []*config.TargetGroup{tg} + continue + } + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): } case <-ctx.Done(): return diff --git a/retrieval/discovery/kubernetes/kubernetes.go b/retrieval/discovery/kubernetes/kubernetes.go index b289c2a98c..3d2d7a54d0 100644 --- a/retrieval/discovery/kubernetes/kubernetes.go +++ b/retrieval/discovery/kubernetes/kubernetes.go @@ -107,8 +107,6 @@ const resyncPeriod = 10 * time.Minute // Run implements the TargetProvider interface. func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - rclient := k.client.Core().GetRESTClient() switch k.role { diff --git a/retrieval/discovery/marathon/marathon.go b/retrieval/discovery/marathon/marathon.go index 5e97763a11..6f9668b668 100644 --- a/retrieval/discovery/marathon/marathon.go +++ b/retrieval/discovery/marathon/marathon.go @@ -103,8 +103,6 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { // Run implements the TargetProvider interface. func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { - defer close(ch) - for { select { case <-ctx.Done():