From 9b5e7623f457db15715f5eba99983ec9c4beab86 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Tue, 13 Aug 2024 14:34:26 +0200 Subject: [PATCH] Add support for multiple listening addresses Fixes #9105 Signed-off-by: Julien Pivotto Signed-off-by: Julien --- cmd/prometheus/main.go | 14 +- docs/command-line/prometheus.md | 6 +- docs/command-line/promtool.md | 14 +- go.mod | 2 +- util/documentcli/documentcli.go | 11 ++ util/netconnlimit/netconnlimit.go | 97 ++++++++++++ util/netconnlimit/netconnlimit_test.go | 124 +++++++++++++++ web/web.go | 38 +++-- web/web_test.go | 208 +++++++++++++++++-------- 9 files changed, 420 insertions(+), 94 deletions(-) create mode 100644 util/netconnlimit/netconnlimit.go create mode 100644 util/netconnlimit/netconnlimit_test.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 94924d2c4..c402fbcb8 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -297,8 +297,8 @@ func main() { a.Flag("config.file", "Prometheus configuration file path."). Default("prometheus.yml").StringVar(&cfg.configFile) - a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry."). - Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress) + a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry. Can be repeated."). + Default("0.0.0.0:9090").StringsVar(&cfg.web.ListenAddresses) a.Flag("auto-gomemlimit.ratio", "The ratio of reserved GOMEMLIMIT memory to the detected maximum container or system memory"). Default("0.9").FloatVar(&cfg.memlimitRatio) @@ -312,7 +312,7 @@ func main() { "Maximum duration before timing out read of the request, and closing idle connections."). Default("5m").SetValue(&cfg.webTimeout) - a.Flag("web.max-connections", "Maximum number of simultaneous connections."). + a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners."). Default("512").IntVar(&cfg.web.MaxConnections) a.Flag("web.external-url", @@ -544,7 +544,7 @@ func main() { localStoragePath = cfg.agentStoragePath } - cfg.web.ExternalURL, err = computeExternalURL(cfg.prometheusURL, cfg.web.ListenAddress) + cfg.web.ExternalURL, err = computeExternalURL(cfg.prometheusURL, cfg.web.ListenAddresses[0]) if err != nil { fmt.Fprintln(os.Stderr, fmt.Errorf("parse external URL %q: %w", cfg.prometheusURL, err)) os.Exit(2) @@ -990,9 +990,9 @@ func main() { }) } - listener, err := webHandler.Listener() + listeners, err := webHandler.Listeners() if err != nil { - level.Error(logger).Log("msg", "Unable to start web listener", "err", err) + level.Error(logger).Log("msg", "Unable to start web listeners", "err", err) os.Exit(1) } @@ -1287,7 +1287,7 @@ func main() { // Web handler. g.Add( func() error { - if err := webHandler.Run(ctxWeb, listener, *webConfig); err != nil { + if err := webHandler.Run(ctxWeb, listeners, *webConfig); err != nil { return fmt.Errorf("error starting web server: %w", err) } return nil diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index a16e807e1..f65c260c4 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -15,11 +15,11 @@ The Prometheus monitoring server | -h, --help | Show context-sensitive help (also try --help-long and --help-man). | | | --version | Show application version. | | | --config.file | Prometheus configuration file path. | `prometheus.yml` | -| --web.listen-address | Address to listen on for UI, API, and telemetry. | `0.0.0.0:9090` | +| --web.listen-address ... | Address to listen on for UI, API, and telemetry. Can be repeated. | `0.0.0.0:9090` | | --auto-gomemlimit.ratio | The ratio of reserved GOMEMLIMIT memory to the detected maximum container or system memory | `0.9` | | --web.config.file | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | | | --web.read-timeout | Maximum duration before timing out read of the request, and closing idle connections. | `5m` | -| --web.max-connections | Maximum number of simultaneous connections. | `512` | +| --web.max-connections | Maximum number of simultaneous connections across all listeners. | `512` | | --web.external-url | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | | | --web.route-prefix | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | | | --web.user-assets | Path to static asset directory, available at /user. | | @@ -57,7 +57,7 @@ The Prometheus monitoring server | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | | --scrape.name-escaping-scheme | Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots". | `values` | -| --enable-feature | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature ... | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index 6bb80169a..0c397387f 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -15,7 +15,7 @@ Tooling for the Prometheus monitoring system. | -h, --help | Show context-sensitive help (also try --help-long and --help-man). | | --version | Show application version. | | --experimental | Enable experimental commands. | -| --enable-feature | Comma separated feature names to enable (only PromQL related and no-default-scrape-port). See https://prometheus.io/docs/prometheus/latest/feature_flags/ for the options and more details. | +| --enable-feature ... | Comma separated feature names to enable (only PromQL related and no-default-scrape-port). See https://prometheus.io/docs/prometheus/latest/feature_flags/ for the options and more details. | @@ -281,7 +281,7 @@ Run series query. | Flag | Description | | --- | --- | -| --match | Series selector. Can be specified multiple times. | +| --match ... | Series selector. Can be specified multiple times. | | --start | Start time (RFC3339 or Unix timestamp). | | --end | End time (RFC3339 or Unix timestamp). | @@ -309,7 +309,7 @@ Run labels query. | --- | --- | | --start | Start time (RFC3339 or Unix timestamp). | | --end | End time (RFC3339 or Unix timestamp). | -| --match | Series selector. Can be specified multiple times. | +| --match ... | Series selector. Can be specified multiple times. | @@ -338,7 +338,7 @@ Run queries against your Prometheus to analyze the usage pattern of certain metr | --type | Type of metric: histogram. | | | --duration | Time frame to analyze. | `1h` | | --time | Query time (RFC3339 or Unix timestamp), defaults to now. | | -| --match | Series selector. Can be specified multiple times. | | +| --match ... | Series selector. Can be specified multiple times. | | @@ -461,7 +461,7 @@ Unit tests for rules. | Flag | Description | Default | | --- | --- | --- | -| --run | If set, will only run test groups whose names match the regular expression. Can be specified multiple times. | | +| --run ... | If set, will only run test groups whose names match the regular expression. Can be specified multiple times. | | | --diff | [Experimental] Print colored differential output between expected & received output. | `false` | @@ -578,7 +578,7 @@ Dump samples from a TSDB. | --sandbox-dir-root | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` | | --min-time | Minimum timestamp to dump. | `-9223372036854775808` | | --max-time | Maximum timestamp to dump. | `9223372036854775807` | -| --match | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | +| --match ... | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | @@ -605,7 +605,7 @@ Dump samples from a TSDB. | --sandbox-dir-root | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` | | --min-time | Minimum timestamp to dump. | `-9223372036854775808` | | --max-time | Maximum timestamp to dump. | `9223372036854775807` | -| --match | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | +| --match ... | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | diff --git a/go.mod b/go.mod index 1c92e52bd..50d560bc3 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,6 @@ require ( go.uber.org/automaxprocs v1.5.3 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 - golang.org/x/net v0.27.0 golang.org/x/oauth2 v0.21.0 golang.org/x/sync v0.7.0 golang.org/x/sys v0.22.0 @@ -190,6 +189,7 @@ require ( golang.org/x/crypto v0.25.0 // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/mod v0.19.0 // indirect + golang.org/x/net v0.27.0 // indirect golang.org/x/term v0.22.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/util/documentcli/documentcli.go b/util/documentcli/documentcli.go index 9de2bb8d4..6964952af 100644 --- a/util/documentcli/documentcli.go +++ b/util/documentcli/documentcli.go @@ -23,6 +23,7 @@ import ( "bytes" "fmt" "io" + "reflect" "strings" "github.com/alecthomas/kingpin/v2" @@ -75,6 +76,16 @@ func createFlagRow(flag *kingpin.FlagModel) []string { name = fmt.Sprintf(`-%c, --%s`, flag.Short, flag.Name) } + valueType := reflect.TypeOf(flag.Value) + if valueType.Kind() == reflect.Ptr { + valueType = valueType.Elem() + } + if valueType.Kind() == reflect.Struct { + if _, found := valueType.FieldByName("slice"); found { + name = fmt.Sprintf(`%s ...`, name) + } + } + return []string{name, strings.ReplaceAll(flag.Help, "|", `\|`), defaultVal} } diff --git a/util/netconnlimit/netconnlimit.go b/util/netconnlimit/netconnlimit.go new file mode 100644 index 000000000..3bdd805b8 --- /dev/null +++ b/util/netconnlimit/netconnlimit.go @@ -0,0 +1,97 @@ +// Copyright 2024 The Prometheus Authors +// Based on golang.org/x/net/netutil: +// Copyright 2013 The Go Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package netconnlimit provides network utility functions for limiting +// simultaneous connections across multiple listeners. +package netconnlimit + +import ( + "net" + "sync" +) + +// NewSharedSemaphore creates and returns a new semaphore channel that can be used +// to limit the number of simultaneous connections across multiple listeners. +func NewSharedSemaphore(n int) chan struct{} { + return make(chan struct{}, n) +} + +// SharedLimitListener returns a listener that accepts at most n simultaneous +// connections across multiple listeners using the provided shared semaphore. +func SharedLimitListener(l net.Listener, sem chan struct{}) net.Listener { + return &sharedLimitListener{ + Listener: l, + sem: sem, + done: make(chan struct{}), + } +} + +type sharedLimitListener struct { + net.Listener + sem chan struct{} + closeOnce sync.Once // Ensures the done chan is only closed once. + done chan struct{} // No values sent; closed when Close is called. +} + +// Acquire acquires the shared semaphore. Returns true if successfully +// acquired, false if the listener is closed and the semaphore is not +// acquired. +func (l *sharedLimitListener) acquire() bool { + select { + case <-l.done: + return false + case l.sem <- struct{}{}: + return true + } +} + +func (l *sharedLimitListener) release() { <-l.sem } + +func (l *sharedLimitListener) Accept() (net.Conn, error) { + if !l.acquire() { + for { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + c.Close() + } + } + + c, err := l.Listener.Accept() + if err != nil { + l.release() + return nil, err + } + return &sharedLimitListenerConn{Conn: c, release: l.release}, nil +} + +func (l *sharedLimitListener) Close() error { + err := l.Listener.Close() + l.closeOnce.Do(func() { close(l.done) }) + return err +} + +type sharedLimitListenerConn struct { + net.Conn + releaseOnce sync.Once + release func() +} + +func (l *sharedLimitListenerConn) Close() error { + err := l.Conn.Close() + l.releaseOnce.Do(l.release) + return err +} diff --git a/util/netconnlimit/netconnlimit_test.go b/util/netconnlimit/netconnlimit_test.go new file mode 100644 index 000000000..e4d490420 --- /dev/null +++ b/util/netconnlimit/netconnlimit_test.go @@ -0,0 +1,124 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package netconnlimit + +import ( + "io" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSharedLimitListenerConcurrency(t *testing.T) { + testCases := []struct { + name string + semCapacity int + connCount int + expected int // Expected number of connections processed simultaneously. + }{ + { + name: "Single connection allowed", + semCapacity: 1, + connCount: 3, + expected: 1, + }, + { + name: "Two connections allowed", + semCapacity: 2, + connCount: 3, + expected: 2, + }, + { + name: "Three connections allowed", + semCapacity: 3, + connCount: 3, + expected: 3, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sem := NewSharedSemaphore(tc.semCapacity) + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "failed to create listener") + defer listener.Close() + + limitedListener := SharedLimitListener(listener, sem) + + var wg sync.WaitGroup + var activeConnCount int64 + var mu sync.Mutex + + wg.Add(tc.connCount) + + // Accept connections. + for i := 0; i < tc.connCount; i++ { + go func() { + defer wg.Done() + + conn, err := limitedListener.Accept() + require.NoError(t, err, "failed to accept connection") + defer conn.Close() + + // Simulate work and track the active connection count. + mu.Lock() + activeConnCount++ + require.LessOrEqual(t, activeConnCount, int64(tc.expected), "too many simultaneous connections") + mu.Unlock() + + time.Sleep(100 * time.Millisecond) + + mu.Lock() + activeConnCount-- + mu.Unlock() + }() + } + + // Create clients that attempt to connect to the listener. + for i := 0; i < tc.connCount; i++ { + go func() { + conn, err := net.Dial("tcp", listener.Addr().String()) + require.NoError(t, err, "failed to connect to listener") + defer conn.Close() + _, _ = io.WriteString(conn, "hello") + }() + } + + wg.Wait() + + // Ensure all connections are released and semaphore is empty. + require.Empty(t, sem) + }) + } +} + +func TestSharedLimitListenerClose(t *testing.T) { + sem := NewSharedSemaphore(2) + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "failed to create listener") + + limitedListener := SharedLimitListener(listener, sem) + + // Close the listener and ensure it does not accept new connections. + err = limitedListener.Close() + require.NoError(t, err, "failed to close listener") + + conn, err := limitedListener.Accept() + require.Error(t, err, "expected error on accept after listener closed") + if conn != nil { + conn.Close() + } +} diff --git a/web/web.go b/web/web.go index 8e84acd03..098baa055 100644 --- a/web/web.go +++ b/web/web.go @@ -49,7 +49,6 @@ import ( toolkit_web "github.com/prometheus/exporter-toolkit/web" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/atomic" - "golang.org/x/net/netutil" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" @@ -59,6 +58,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/template" "github.com/prometheus/prometheus/util/httputil" + "github.com/prometheus/prometheus/util/netconnlimit" api_v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/prometheus/prometheus/web/ui" ) @@ -244,7 +244,7 @@ type Options struct { Version *PrometheusVersion Flags map[string]string - ListenAddress string + ListenAddresses []string CORSOrigin *regexp.Regexp ReadTimeout time.Duration MaxConnections int @@ -334,7 +334,7 @@ func New(logger log.Logger, o *Options) *Handler { }, o.Flags, api_v1.GlobalURLOptions{ - ListenAddress: o.ListenAddress, + ListenAddress: o.ListenAddresses[0], Host: o.ExternalURL.Host, Scheme: o.ExternalURL.Scheme, }, @@ -566,15 +566,29 @@ func (h *Handler) Reload() <-chan chan error { return h.reloadCh } -// Listener creates the TCP listener for web requests. -func (h *Handler) Listener() (net.Listener, error) { - level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) +// Listeners creates the TCP listeners for web requests. +func (h *Handler) Listeners() ([]net.Listener, error) { + var listeners []net.Listener + sem := netconnlimit.NewSharedSemaphore(h.options.MaxConnections) + for _, address := range h.options.ListenAddresses { + listener, err := h.Listener(address, sem) + if err != nil { + return listeners, err + } + listeners = append(listeners, listener) + } + return listeners, nil +} - listener, err := net.Listen("tcp", h.options.ListenAddress) +// Listener creates the TCP listener for web requests. +func (h *Handler) Listener(address string, sem chan struct{}) (net.Listener, error) { + level.Info(h.logger).Log("msg", "Start listening for connections", "address", address) + + listener, err := net.Listen("tcp", address) if err != nil { return listener, err } - listener = netutil.LimitListener(listener, h.options.MaxConnections) + listener = netconnlimit.SharedLimitListener(listener, sem) // Monitor incoming connections with conntrack. listener = conntrack.NewListener(listener, @@ -585,10 +599,10 @@ func (h *Handler) Listener() (net.Listener, error) { } // Run serves the HTTP endpoints. -func (h *Handler) Run(ctx context.Context, listener net.Listener, webConfig string) error { - if listener == nil { +func (h *Handler) Run(ctx context.Context, listeners []net.Listener, webConfig string) error { + if len(listeners) == 0 { var err error - listener, err = h.Listener() + listeners, err = h.Listeners() if err != nil { return err } @@ -623,7 +637,7 @@ func (h *Handler) Run(ctx context.Context, listener net.Listener, webConfig stri errCh := make(chan error, 1) go func() { - errCh <- toolkit_web.Serve(listener, httpSrv, &toolkit_web.FlagConfig{WebConfigFile: &webConfig}, h.logger) + errCh <- toolkit_web.ServeMultiple(listeners, httpSrv, &toolkit_web.FlagConfig{WebConfigFile: &webConfig}, h.logger) }() select { diff --git a/web/web_test.go b/web/web_test.go index e1fa66fa8..b660746b1 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -73,19 +73,19 @@ func TestReadyAndHealthy(t *testing.T) { port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) opts := &Options{ - ListenAddress: port, - ReadTimeout: 30 * time.Second, - MaxConnections: 512, - Context: nil, - Storage: nil, - LocalStorage: &dbAdapter{db}, - TSDBDir: dbDir, - QueryEngine: nil, - ScrapeManager: &scrape.Manager{}, - RuleManager: &rules.Manager{}, - Notifier: nil, - RoutePrefix: "/", - EnableAdminAPI: true, + ListenAddresses: []string{port}, + ReadTimeout: 30 * time.Second, + MaxConnections: 512, + Context: nil, + Storage: nil, + LocalStorage: &dbAdapter{db}, + TSDBDir: dbDir, + QueryEngine: nil, + ScrapeManager: &scrape.Manager{}, + RuleManager: &rules.Manager{}, + Notifier: nil, + RoutePrefix: "/", + EnableAdminAPI: true, ExternalURL: &url.URL{ Scheme: "http", Host: "localhost" + port, @@ -101,9 +101,9 @@ func TestReadyAndHealthy(t *testing.T) { webHandler.config = &config.Config{} webHandler.notifier = ¬ifier.Manager{} - l, err := webHandler.Listener() + l, err := webHandler.Listeners() if err != nil { - panic(fmt.Sprintf("Unable to start web listener: %s", err)) + panic(fmt.Sprintf("Unable to start web listeners: %s", err)) } ctx, cancel := context.WithCancel(context.Background()) @@ -198,19 +198,19 @@ func TestRoutePrefix(t *testing.T) { port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) opts := &Options{ - ListenAddress: port, - ReadTimeout: 30 * time.Second, - MaxConnections: 512, - Context: nil, - TSDBDir: dbDir, - LocalStorage: &dbAdapter{db}, - Storage: nil, - QueryEngine: nil, - ScrapeManager: nil, - RuleManager: nil, - Notifier: nil, - RoutePrefix: "/prometheus", - EnableAdminAPI: true, + ListenAddresses: []string{port}, + ReadTimeout: 30 * time.Second, + MaxConnections: 512, + Context: nil, + TSDBDir: dbDir, + LocalStorage: &dbAdapter{db}, + Storage: nil, + QueryEngine: nil, + ScrapeManager: nil, + RuleManager: nil, + Notifier: nil, + RoutePrefix: "/prometheus", + EnableAdminAPI: true, ExternalURL: &url.URL{ Host: "localhost.localdomain" + port, Scheme: "http", @@ -220,9 +220,9 @@ func TestRoutePrefix(t *testing.T) { opts.Flags = map[string]string{} webHandler := New(nil, opts) - l, err := webHandler.Listener() + l, err := webHandler.Listeners() if err != nil { - panic(fmt.Sprintf("Unable to start web listener: %s", err)) + panic(fmt.Sprintf("Unable to start web listeners: %s", err)) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -299,8 +299,8 @@ func TestDebugHandler(t *testing.T) { {"/foo", "/bar/debug/pprof/goroutine", 404}, } { opts := &Options{ - RoutePrefix: tc.prefix, - ListenAddress: "somehost:9090", + RoutePrefix: tc.prefix, + ListenAddresses: []string{"somehost:9090"}, ExternalURL: &url.URL{ Host: "localhost.localdomain:9090", Scheme: "http", @@ -324,8 +324,8 @@ func TestDebugHandler(t *testing.T) { func TestHTTPMetrics(t *testing.T) { t.Parallel() handler := New(nil, &Options{ - RoutePrefix: "/", - ListenAddress: "somehost:9090", + RoutePrefix: "/", + ListenAddresses: []string{"somehost:9090"}, ExternalURL: &url.URL{ Host: "localhost.localdomain:9090", Scheme: "http", @@ -381,18 +381,18 @@ func TestShutdownWithStaleConnection(t *testing.T) { port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) opts := &Options{ - ListenAddress: port, - ReadTimeout: timeout, - MaxConnections: 512, - Context: nil, - Storage: nil, - LocalStorage: &dbAdapter{db}, - TSDBDir: dbDir, - QueryEngine: nil, - ScrapeManager: &scrape.Manager{}, - RuleManager: &rules.Manager{}, - Notifier: nil, - RoutePrefix: "/", + ListenAddresses: []string{port}, + ReadTimeout: timeout, + MaxConnections: 512, + Context: nil, + Storage: nil, + LocalStorage: &dbAdapter{db}, + TSDBDir: dbDir, + QueryEngine: nil, + ScrapeManager: &scrape.Manager{}, + RuleManager: &rules.Manager{}, + Notifier: nil, + RoutePrefix: "/", ExternalURL: &url.URL{ Scheme: "http", Host: "localhost" + port, @@ -408,9 +408,9 @@ func TestShutdownWithStaleConnection(t *testing.T) { webHandler.config = &config.Config{} webHandler.notifier = ¬ifier.Manager{} - l, err := webHandler.Listener() + l, err := webHandler.Listeners() if err != nil { - panic(fmt.Sprintf("Unable to start web listener: %s", err)) + panic(fmt.Sprintf("Unable to start web listeners: %s", err)) } closed := make(chan struct{}) @@ -448,7 +448,7 @@ func TestHandleMultipleQuitRequests(t *testing.T) { port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) opts := &Options{ - ListenAddress: port, + ListenAddresses: []string{port}, MaxConnections: 512, EnableLifecycle: true, RoutePrefix: "/", @@ -461,9 +461,9 @@ func TestHandleMultipleQuitRequests(t *testing.T) { webHandler := New(nil, opts) webHandler.config = &config.Config{} webHandler.notifier = ¬ifier.Manager{} - l, err := webHandler.Listener() + l, err := webHandler.Listeners() if err != nil { - panic(fmt.Sprintf("Unable to start web listener: %s", err)) + panic(fmt.Sprintf("Unable to start web listeners: %s", err)) } ctx, cancel := context.WithCancel(context.Background()) closed := make(chan struct{}) @@ -513,17 +513,17 @@ func TestAgentAPIEndPoints(t *testing.T) { port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) opts := &Options{ - ListenAddress: port, - ReadTimeout: 30 * time.Second, - MaxConnections: 512, - Context: nil, - Storage: nil, - QueryEngine: nil, - ScrapeManager: &scrape.Manager{}, - RuleManager: &rules.Manager{}, - Notifier: nil, - RoutePrefix: "/", - EnableAdminAPI: true, + ListenAddresses: []string{port}, + ReadTimeout: 30 * time.Second, + MaxConnections: 512, + Context: nil, + Storage: nil, + QueryEngine: nil, + ScrapeManager: &scrape.Manager{}, + RuleManager: &rules.Manager{}, + Notifier: nil, + RoutePrefix: "/", + EnableAdminAPI: true, ExternalURL: &url.URL{ Scheme: "http", Host: "localhost" + port, @@ -540,9 +540,9 @@ func TestAgentAPIEndPoints(t *testing.T) { webHandler.SetReady(true) webHandler.config = &config.Config{} webHandler.notifier = ¬ifier.Manager{} - l, err := webHandler.Listener() + l, err := webHandler.Listeners() if err != nil { - panic(fmt.Sprintf("Unable to start web listener: %s", err)) + panic(fmt.Sprintf("Unable to start web listeners: %s", err)) } ctx, cancel := context.WithCancel(context.Background()) @@ -628,3 +628,83 @@ func cleanupSnapshot(t *testing.T, dbDir string, resp *http.Response) { require.NoError(t, os.Remove(filepath.Join(dbDir, "snapshots", snapshot.Data.Name))) require.NoError(t, os.Remove(filepath.Join(dbDir, "snapshots"))) } + +func TestMultipleListenAddresses(t *testing.T) { + t.Parallel() + + dbDir := t.TempDir() + + db, err := tsdb.Open(dbDir, nil, nil, nil, nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // Create multiple ports for testing multiple ListenAddresses + port1 := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) + port2 := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t)) + + opts := &Options{ + ListenAddresses: []string{port1, port2}, + ReadTimeout: 30 * time.Second, + MaxConnections: 512, + Context: nil, + Storage: nil, + LocalStorage: &dbAdapter{db}, + TSDBDir: dbDir, + QueryEngine: nil, + ScrapeManager: &scrape.Manager{}, + RuleManager: &rules.Manager{}, + Notifier: nil, + RoutePrefix: "/", + EnableAdminAPI: true, + ExternalURL: &url.URL{ + Scheme: "http", + Host: "localhost" + port1, + Path: "/", + }, + Version: &PrometheusVersion{}, + Gatherer: prometheus.DefaultGatherer, + } + + opts.Flags = map[string]string{} + + webHandler := New(nil, opts) + + webHandler.config = &config.Config{} + webHandler.notifier = ¬ifier.Manager{} + l, err := webHandler.Listeners() + if err != nil { + panic(fmt.Sprintf("Unable to start web listener: %s", err)) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + err := webHandler.Run(ctx, l, "") + if err != nil { + panic(fmt.Sprintf("Can't start web handler:%s", err)) + } + }() + + // Give some time for the web goroutine to run since we need the server + // to be up before starting tests. + time.Sleep(5 * time.Second) + + // Set to ready. + webHandler.SetReady(true) + + for _, port := range []string{port1, port2} { + baseURL := "http://localhost" + port + + resp, err := http.Get(baseURL + "/-/healthy") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + cleanupTestResponse(t, resp) + + resp, err = http.Get(baseURL + "/-/ready") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + cleanupTestResponse(t, resp) + } +}