diff --git a/web/web.go b/web/web.go index 9955facab8..ee16d2004b 100644 --- a/web/web.go +++ b/web/web.go @@ -540,6 +540,10 @@ func (h *Handler) Run(ctx context.Context) error { httpl = m.Match(cmux.HTTP1Fast()) grpcSrv = grpc.NewServer() ) + + // Prevent open connections to block the shutdown of the handler. + m.SetReadTimeout(h.options.ReadTimeout) + av2 := api_v2.New( h.options.LocalStorage, h.options.TSDBDir, @@ -603,11 +607,27 @@ func (h *Handler) Run(ctx context.Context) error { return e case <-ctx.Done(): httpSrv.Shutdown(ctx) - grpcSrv.GracefulStop() + stopGRPCSrv(grpcSrv) return nil } } +// stopGRPCSrv stops a given GRPC server. An attempt to stop the server +// gracefully is made first. After 15s, the server to forced to stop. +func stopGRPCSrv(srv *grpc.Server) { + stop := make(chan struct{}) + go func() { + srv.GracefulStop() + close(stop) + }() + + select { + case <-time.After(15 * time.Second): + srv.Stop() + case <-stop: + } +} + func (h *Handler) alerts(w http.ResponseWriter, r *http.Request) { var groups []*rules.Group diff --git a/web/web_test.go b/web/web_test.go index 96c4c5c4f5..a721e3050e 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io/ioutil" + "net" "net/http" "net/http/httptest" "net/url" @@ -138,8 +139,10 @@ func TestReadyAndHealthy(t *testing.T) { webHandler.config = &config.Config{} webHandler.notifier = ¬ifier.Manager{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { - err := webHandler.Run(context.Background()) + err := webHandler.Run(ctx) if err != nil { panic(fmt.Sprintf("Can't start web handler:%s", err)) } @@ -323,8 +326,10 @@ func TestRoutePrefix(t *testing.T) { opts.Flags = map[string]string{} webHandler := New(nil, opts) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { - err := webHandler.Run(context.Background()) + err := webHandler.Run(ctx) if err != nil { panic(fmt.Sprintf("Can't start web handler:%s", err)) } @@ -459,3 +464,73 @@ func TestHTTPMetrics(t *testing.T) { testutil.Equals(t, 2, int(prom_testutil.ToFloat64(counter.WithLabelValues("/-/ready", strconv.Itoa(http.StatusOK))))) testutil.Equals(t, 1, int(prom_testutil.ToFloat64(counter.WithLabelValues("/-/ready", strconv.Itoa(http.StatusServiceUnavailable))))) } + +func TestShutdownWithStaleConnection(t *testing.T) { + dbDir, err := ioutil.TempDir("", "tsdb-ready") + testutil.Ok(t, err) + defer testutil.Ok(t, os.RemoveAll(dbDir)) + + db, err := tsdb.Open(dbDir, nil, nil, nil) + testutil.Ok(t, err) + + timeout := 10 * time.Second + + opts := &Options{ + ListenAddress: ":9090", + ReadTimeout: timeout, + MaxConnections: 512, + Context: nil, + Storage: nil, + LocalStorage: &dbAdapter{db}, + TSDBDir: dbDir, + QueryEngine: nil, + ScrapeManager: &scrape.Manager{}, + RuleManager: &rules.Manager{}, + Notifier: nil, + RoutePrefix: "/", + ExternalURL: &url.URL{ + Scheme: "http", + Host: "localhost:9090", + Path: "/", + }, + Version: &PrometheusVersion{}, + Gatherer: prometheus.DefaultGatherer, + } + + opts.Flags = map[string]string{} + + webHandler := New(nil, opts) + + webHandler.config = &config.Config{} + webHandler.notifier = ¬ifier.Manager{} + + closed := make(chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err := webHandler.Run(ctx) + if err != nil { + panic(fmt.Sprintf("Can't start web handler:%s", err)) + } + close(closed) + }() + + // Give some time for the web goroutine to run since we need the server + // to be up before starting tests. + time.Sleep(5 * time.Second) + + // Open a socket, and don't use it. This connection should then be closed + // after the ReadTimeout. + c, err := net.Dial("tcp", "localhost:9090") + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, c.Close()) }) + + // Stop the web handler. + cancel() + + select { + case <-closed: + case <-time.After(timeout + 5*time.Second): + t.Fatalf("Server still running after read timeout.") + } +}