mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Merge pull request #14665 from roidelapluie/multiple-listening-addresses
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Add support for multiple listening addresses
This commit is contained in:
commit
349068ad3e
|
@ -297,8 +297,8 @@ func main() {
|
|||
a.Flag("config.file", "Prometheus configuration file path.").
|
||||
Default("prometheus.yml").StringVar(&cfg.configFile)
|
||||
|
||||
a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
|
||||
Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)
|
||||
a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry. Can be repeated.").
|
||||
Default("0.0.0.0:9090").StringsVar(&cfg.web.ListenAddresses)
|
||||
|
||||
a.Flag("auto-gomemlimit.ratio", "The ratio of reserved GOMEMLIMIT memory to the detected maximum container or system memory").
|
||||
Default("0.9").FloatVar(&cfg.memlimitRatio)
|
||||
|
@ -312,7 +312,7 @@ func main() {
|
|||
"Maximum duration before timing out read of the request, and closing idle connections.").
|
||||
Default("5m").SetValue(&cfg.webTimeout)
|
||||
|
||||
a.Flag("web.max-connections", "Maximum number of simultaneous connections.").
|
||||
a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners.").
|
||||
Default("512").IntVar(&cfg.web.MaxConnections)
|
||||
|
||||
a.Flag("web.external-url",
|
||||
|
@ -544,7 +544,7 @@ func main() {
|
|||
localStoragePath = cfg.agentStoragePath
|
||||
}
|
||||
|
||||
cfg.web.ExternalURL, err = computeExternalURL(cfg.prometheusURL, cfg.web.ListenAddress)
|
||||
cfg.web.ExternalURL, err = computeExternalURL(cfg.prometheusURL, cfg.web.ListenAddresses[0])
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, fmt.Errorf("parse external URL %q: %w", cfg.prometheusURL, err))
|
||||
os.Exit(2)
|
||||
|
@ -990,9 +990,9 @@ func main() {
|
|||
})
|
||||
}
|
||||
|
||||
listener, err := webHandler.Listener()
|
||||
listeners, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
level.Error(logger).Log("msg", "Unable to start web listener", "err", err)
|
||||
level.Error(logger).Log("msg", "Unable to start web listeners", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
@ -1287,7 +1287,7 @@ func main() {
|
|||
// Web handler.
|
||||
g.Add(
|
||||
func() error {
|
||||
if err := webHandler.Run(ctxWeb, listener, *webConfig); err != nil {
|
||||
if err := webHandler.Run(ctxWeb, listeners, *webConfig); err != nil {
|
||||
return fmt.Errorf("error starting web server: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -15,11 +15,11 @@ The Prometheus monitoring server
|
|||
| <code class="text-nowrap">-h</code>, <code class="text-nowrap">--help</code> | Show context-sensitive help (also try --help-long and --help-man). | |
|
||||
| <code class="text-nowrap">--version</code> | Show application version. | |
|
||||
| <code class="text-nowrap">--config.file</code> | Prometheus configuration file path. | `prometheus.yml` |
|
||||
| <code class="text-nowrap">--web.listen-address</code> | Address to listen on for UI, API, and telemetry. | `0.0.0.0:9090` |
|
||||
| <code class="text-nowrap">--web.listen-address</code> <code class="text-nowrap">...<code class="text-nowrap"> | Address to listen on for UI, API, and telemetry. Can be repeated. | `0.0.0.0:9090` |
|
||||
| <code class="text-nowrap">--auto-gomemlimit.ratio</code> | The ratio of reserved GOMEMLIMIT memory to the detected maximum container or system memory | `0.9` |
|
||||
| <code class="text-nowrap">--web.config.file</code> | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | |
|
||||
| <code class="text-nowrap">--web.read-timeout</code> | Maximum duration before timing out read of the request, and closing idle connections. | `5m` |
|
||||
| <code class="text-nowrap">--web.max-connections</code> | Maximum number of simultaneous connections. | `512` |
|
||||
| <code class="text-nowrap">--web.max-connections</code> | Maximum number of simultaneous connections across all listeners. | `512` |
|
||||
| <code class="text-nowrap">--web.external-url</code> | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | |
|
||||
| <code class="text-nowrap">--web.route-prefix</code> | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | |
|
||||
| <code class="text-nowrap">--web.user-assets</code> | Path to static asset directory, available at /user. | |
|
||||
|
@ -57,7 +57,7 @@ The Prometheus monitoring server
|
|||
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
|
||||
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
|
||||
| <code class="text-nowrap">--scrape.name-escaping-scheme</code> | Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots". | `values` |
|
||||
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: agent, auto-gomemlimit, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
|
||||
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
|
||||
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ Tooling for the Prometheus monitoring system.
|
|||
| <code class="text-nowrap">-h</code>, <code class="text-nowrap">--help</code> | Show context-sensitive help (also try --help-long and --help-man). |
|
||||
| <code class="text-nowrap">--version</code> | Show application version. |
|
||||
| <code class="text-nowrap">--experimental</code> | Enable experimental commands. |
|
||||
| <code class="text-nowrap">--enable-feature</code> | Comma separated feature names to enable (only PromQL related and no-default-scrape-port). See https://prometheus.io/docs/prometheus/latest/feature_flags/ for the options and more details. |
|
||||
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable (only PromQL related and no-default-scrape-port). See https://prometheus.io/docs/prometheus/latest/feature_flags/ for the options and more details. |
|
||||
|
||||
|
||||
|
||||
|
@ -281,7 +281,7 @@ Run series query.
|
|||
|
||||
| Flag | Description |
|
||||
| --- | --- |
|
||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. |
|
||||
| <code class="text-nowrap">--match</code> <code class="text-nowrap">...<code class="text-nowrap"> | Series selector. Can be specified multiple times. |
|
||||
| <code class="text-nowrap">--start</code> | Start time (RFC3339 or Unix timestamp). |
|
||||
| <code class="text-nowrap">--end</code> | End time (RFC3339 or Unix timestamp). |
|
||||
|
||||
|
@ -309,7 +309,7 @@ Run labels query.
|
|||
| --- | --- |
|
||||
| <code class="text-nowrap">--start</code> | Start time (RFC3339 or Unix timestamp). |
|
||||
| <code class="text-nowrap">--end</code> | End time (RFC3339 or Unix timestamp). |
|
||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. |
|
||||
| <code class="text-nowrap">--match</code> <code class="text-nowrap">...<code class="text-nowrap"> | Series selector. Can be specified multiple times. |
|
||||
|
||||
|
||||
|
||||
|
@ -338,7 +338,7 @@ Run queries against your Prometheus to analyze the usage pattern of certain metr
|
|||
| <code class="text-nowrap">--type</code> | Type of metric: histogram. | |
|
||||
| <code class="text-nowrap">--duration</code> | Time frame to analyze. | `1h` |
|
||||
| <code class="text-nowrap">--time</code> | Query time (RFC3339 or Unix timestamp), defaults to now. | |
|
||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | |
|
||||
| <code class="text-nowrap">--match</code> <code class="text-nowrap">...<code class="text-nowrap"> | Series selector. Can be specified multiple times. | |
|
||||
|
||||
|
||||
|
||||
|
@ -461,7 +461,7 @@ Unit tests for rules.
|
|||
|
||||
| Flag | Description | Default |
|
||||
| --- | --- | --- |
|
||||
| <code class="text-nowrap">--run</code> | If set, will only run test groups whose names match the regular expression. Can be specified multiple times. | |
|
||||
| <code class="text-nowrap">--run</code> <code class="text-nowrap">...<code class="text-nowrap"> | If set, will only run test groups whose names match the regular expression. Can be specified multiple times. | |
|
||||
| <code class="text-nowrap">--diff</code> | [Experimental] Print colored differential output between expected & received output. | `false` |
|
||||
|
||||
|
||||
|
@ -578,7 +578,7 @@ Dump samples from a TSDB.
|
|||
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
|
||||
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
||||
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||
| <code class="text-nowrap">--match</code> <code class="text-nowrap">...<code class="text-nowrap"> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||
|
||||
|
||||
|
||||
|
@ -605,7 +605,7 @@ Dump samples from a TSDB.
|
|||
| <code class="text-nowrap">--sandbox-dir-root</code> | Root directory where a sandbox directory would be created in case WAL replay generates chunks. The sandbox directory is cleaned up at the end. | `data/` |
|
||||
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
||||
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
||||
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||
| <code class="text-nowrap">--match</code> <code class="text-nowrap">...<code class="text-nowrap"> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||
|
||||
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -75,7 +75,6 @@ require (
|
|||
go.uber.org/automaxprocs v1.5.3
|
||||
go.uber.org/goleak v1.3.0
|
||||
go.uber.org/multierr v1.11.0
|
||||
golang.org/x/net v0.27.0
|
||||
golang.org/x/oauth2 v0.21.0
|
||||
golang.org/x/sync v0.7.0
|
||||
golang.org/x/sys v0.22.0
|
||||
|
@ -190,6 +189,7 @@ require (
|
|||
golang.org/x/crypto v0.25.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
|
||||
golang.org/x/mod v0.19.0 // indirect
|
||||
golang.org/x/net v0.27.0 // indirect
|
||||
golang.org/x/term v0.22.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
|
@ -75,6 +76,16 @@ func createFlagRow(flag *kingpin.FlagModel) []string {
|
|||
name = fmt.Sprintf(`<code class="text-nowrap">-%c</code>, <code class="text-nowrap">--%s</code>`, flag.Short, flag.Name)
|
||||
}
|
||||
|
||||
valueType := reflect.TypeOf(flag.Value)
|
||||
if valueType.Kind() == reflect.Ptr {
|
||||
valueType = valueType.Elem()
|
||||
}
|
||||
if valueType.Kind() == reflect.Struct {
|
||||
if _, found := valueType.FieldByName("slice"); found {
|
||||
name = fmt.Sprintf(`%s <code class="text-nowrap">...<code class="text-nowrap">`, name)
|
||||
}
|
||||
}
|
||||
|
||||
return []string{name, strings.ReplaceAll(flag.Help, "|", `\|`), defaultVal}
|
||||
}
|
||||
|
||||
|
|
97
util/netconnlimit/netconnlimit.go
Normal file
97
util/netconnlimit/netconnlimit.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Based on golang.org/x/net/netutil:
|
||||
// Copyright 2013 The Go Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package netconnlimit provides network utility functions for limiting
|
||||
// simultaneous connections across multiple listeners.
|
||||
package netconnlimit
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// NewSharedSemaphore creates and returns a new semaphore channel that can be used
|
||||
// to limit the number of simultaneous connections across multiple listeners.
|
||||
func NewSharedSemaphore(n int) chan struct{} {
|
||||
return make(chan struct{}, n)
|
||||
}
|
||||
|
||||
// SharedLimitListener returns a listener that accepts at most n simultaneous
|
||||
// connections across multiple listeners using the provided shared semaphore.
|
||||
func SharedLimitListener(l net.Listener, sem chan struct{}) net.Listener {
|
||||
return &sharedLimitListener{
|
||||
Listener: l,
|
||||
sem: sem,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type sharedLimitListener struct {
|
||||
net.Listener
|
||||
sem chan struct{}
|
||||
closeOnce sync.Once // Ensures the done chan is only closed once.
|
||||
done chan struct{} // No values sent; closed when Close is called.
|
||||
}
|
||||
|
||||
// Acquire acquires the shared semaphore. Returns true if successfully
|
||||
// acquired, false if the listener is closed and the semaphore is not
|
||||
// acquired.
|
||||
func (l *sharedLimitListener) acquire() bool {
|
||||
select {
|
||||
case <-l.done:
|
||||
return false
|
||||
case l.sem <- struct{}{}:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (l *sharedLimitListener) release() { <-l.sem }
|
||||
|
||||
func (l *sharedLimitListener) Accept() (net.Conn, error) {
|
||||
if !l.acquire() {
|
||||
for {
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
l.release()
|
||||
return nil, err
|
||||
}
|
||||
return &sharedLimitListenerConn{Conn: c, release: l.release}, nil
|
||||
}
|
||||
|
||||
func (l *sharedLimitListener) Close() error {
|
||||
err := l.Listener.Close()
|
||||
l.closeOnce.Do(func() { close(l.done) })
|
||||
return err
|
||||
}
|
||||
|
||||
type sharedLimitListenerConn struct {
|
||||
net.Conn
|
||||
releaseOnce sync.Once
|
||||
release func()
|
||||
}
|
||||
|
||||
func (l *sharedLimitListenerConn) Close() error {
|
||||
err := l.Conn.Close()
|
||||
l.releaseOnce.Do(l.release)
|
||||
return err
|
||||
}
|
124
util/netconnlimit/netconnlimit_test.go
Normal file
124
util/netconnlimit/netconnlimit_test.go
Normal file
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2024 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package netconnlimit
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSharedLimitListenerConcurrency(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
semCapacity int
|
||||
connCount int
|
||||
expected int // Expected number of connections processed simultaneously.
|
||||
}{
|
||||
{
|
||||
name: "Single connection allowed",
|
||||
semCapacity: 1,
|
||||
connCount: 3,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
name: "Two connections allowed",
|
||||
semCapacity: 2,
|
||||
connCount: 3,
|
||||
expected: 2,
|
||||
},
|
||||
{
|
||||
name: "Three connections allowed",
|
||||
semCapacity: 3,
|
||||
connCount: 3,
|
||||
expected: 3,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
sem := NewSharedSemaphore(tc.semCapacity)
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err, "failed to create listener")
|
||||
defer listener.Close()
|
||||
|
||||
limitedListener := SharedLimitListener(listener, sem)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var activeConnCount int64
|
||||
var mu sync.Mutex
|
||||
|
||||
wg.Add(tc.connCount)
|
||||
|
||||
// Accept connections.
|
||||
for i := 0; i < tc.connCount; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
conn, err := limitedListener.Accept()
|
||||
require.NoError(t, err, "failed to accept connection")
|
||||
defer conn.Close()
|
||||
|
||||
// Simulate work and track the active connection count.
|
||||
mu.Lock()
|
||||
activeConnCount++
|
||||
require.LessOrEqual(t, activeConnCount, int64(tc.expected), "too many simultaneous connections")
|
||||
mu.Unlock()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
mu.Lock()
|
||||
activeConnCount--
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// Create clients that attempt to connect to the listener.
|
||||
for i := 0; i < tc.connCount; i++ {
|
||||
go func() {
|
||||
conn, err := net.Dial("tcp", listener.Addr().String())
|
||||
require.NoError(t, err, "failed to connect to listener")
|
||||
defer conn.Close()
|
||||
_, _ = io.WriteString(conn, "hello")
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Ensure all connections are released and semaphore is empty.
|
||||
require.Empty(t, sem)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSharedLimitListenerClose(t *testing.T) {
|
||||
sem := NewSharedSemaphore(2)
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err, "failed to create listener")
|
||||
|
||||
limitedListener := SharedLimitListener(listener, sem)
|
||||
|
||||
// Close the listener and ensure it does not accept new connections.
|
||||
err = limitedListener.Close()
|
||||
require.NoError(t, err, "failed to close listener")
|
||||
|
||||
conn, err := limitedListener.Accept()
|
||||
require.Error(t, err, "expected error on accept after listener closed")
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}
|
38
web/web.go
38
web/web.go
|
@ -49,7 +49,6 @@ import (
|
|||
toolkit_web "github.com/prometheus/exporter-toolkit/web"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/net/netutil"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
|
@ -59,6 +58,7 @@ import (
|
|||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/template"
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
"github.com/prometheus/prometheus/util/netconnlimit"
|
||||
api_v1 "github.com/prometheus/prometheus/web/api/v1"
|
||||
"github.com/prometheus/prometheus/web/ui"
|
||||
)
|
||||
|
@ -244,7 +244,7 @@ type Options struct {
|
|||
Version *PrometheusVersion
|
||||
Flags map[string]string
|
||||
|
||||
ListenAddress string
|
||||
ListenAddresses []string
|
||||
CORSOrigin *regexp.Regexp
|
||||
ReadTimeout time.Duration
|
||||
MaxConnections int
|
||||
|
@ -334,7 +334,7 @@ func New(logger log.Logger, o *Options) *Handler {
|
|||
},
|
||||
o.Flags,
|
||||
api_v1.GlobalURLOptions{
|
||||
ListenAddress: o.ListenAddress,
|
||||
ListenAddress: o.ListenAddresses[0],
|
||||
Host: o.ExternalURL.Host,
|
||||
Scheme: o.ExternalURL.Scheme,
|
||||
},
|
||||
|
@ -566,15 +566,29 @@ func (h *Handler) Reload() <-chan chan error {
|
|||
return h.reloadCh
|
||||
}
|
||||
|
||||
// Listener creates the TCP listener for web requests.
|
||||
func (h *Handler) Listener() (net.Listener, error) {
|
||||
level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress)
|
||||
// Listeners creates the TCP listeners for web requests.
|
||||
func (h *Handler) Listeners() ([]net.Listener, error) {
|
||||
var listeners []net.Listener
|
||||
sem := netconnlimit.NewSharedSemaphore(h.options.MaxConnections)
|
||||
for _, address := range h.options.ListenAddresses {
|
||||
listener, err := h.Listener(address, sem)
|
||||
if err != nil {
|
||||
return listeners, err
|
||||
}
|
||||
listeners = append(listeners, listener)
|
||||
}
|
||||
return listeners, nil
|
||||
}
|
||||
|
||||
listener, err := net.Listen("tcp", h.options.ListenAddress)
|
||||
// Listener creates the TCP listener for web requests.
|
||||
func (h *Handler) Listener(address string, sem chan struct{}) (net.Listener, error) {
|
||||
level.Info(h.logger).Log("msg", "Start listening for connections", "address", address)
|
||||
|
||||
listener, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
return listener, err
|
||||
}
|
||||
listener = netutil.LimitListener(listener, h.options.MaxConnections)
|
||||
listener = netconnlimit.SharedLimitListener(listener, sem)
|
||||
|
||||
// Monitor incoming connections with conntrack.
|
||||
listener = conntrack.NewListener(listener,
|
||||
|
@ -585,10 +599,10 @@ func (h *Handler) Listener() (net.Listener, error) {
|
|||
}
|
||||
|
||||
// Run serves the HTTP endpoints.
|
||||
func (h *Handler) Run(ctx context.Context, listener net.Listener, webConfig string) error {
|
||||
if listener == nil {
|
||||
func (h *Handler) Run(ctx context.Context, listeners []net.Listener, webConfig string) error {
|
||||
if len(listeners) == 0 {
|
||||
var err error
|
||||
listener, err = h.Listener()
|
||||
listeners, err = h.Listeners()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -623,7 +637,7 @@ func (h *Handler) Run(ctx context.Context, listener net.Listener, webConfig stri
|
|||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- toolkit_web.Serve(listener, httpSrv, &toolkit_web.FlagConfig{WebConfigFile: &webConfig}, h.logger)
|
||||
errCh <- toolkit_web.ServeMultiple(listeners, httpSrv, &toolkit_web.FlagConfig{WebConfigFile: &webConfig}, h.logger)
|
||||
}()
|
||||
|
||||
select {
|
||||
|
|
208
web/web_test.go
208
web/web_test.go
|
@ -73,19 +73,19 @@ func TestReadyAndHealthy(t *testing.T) {
|
|||
port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
|
||||
opts := &Options{
|
||||
ListenAddress: port,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
TSDBDir: dbDir,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
EnableAdminAPI: true,
|
||||
ListenAddresses: []string{port},
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
TSDBDir: dbDir,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
EnableAdminAPI: true,
|
||||
ExternalURL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost" + port,
|
||||
|
@ -101,9 +101,9 @@ func TestReadyAndHealthy(t *testing.T) {
|
|||
|
||||
webHandler.config = &config.Config{}
|
||||
webHandler.notifier = ¬ifier.Manager{}
|
||||
l, err := webHandler.Listener()
|
||||
l, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start web listener: %s", err))
|
||||
panic(fmt.Sprintf("Unable to start web listeners: %s", err))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -198,19 +198,19 @@ func TestRoutePrefix(t *testing.T) {
|
|||
port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
|
||||
opts := &Options{
|
||||
ListenAddress: port,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
TSDBDir: dbDir,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
Storage: nil,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: nil,
|
||||
RuleManager: nil,
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/prometheus",
|
||||
EnableAdminAPI: true,
|
||||
ListenAddresses: []string{port},
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
TSDBDir: dbDir,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
Storage: nil,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: nil,
|
||||
RuleManager: nil,
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/prometheus",
|
||||
EnableAdminAPI: true,
|
||||
ExternalURL: &url.URL{
|
||||
Host: "localhost.localdomain" + port,
|
||||
Scheme: "http",
|
||||
|
@ -220,9 +220,9 @@ func TestRoutePrefix(t *testing.T) {
|
|||
opts.Flags = map[string]string{}
|
||||
|
||||
webHandler := New(nil, opts)
|
||||
l, err := webHandler.Listener()
|
||||
l, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start web listener: %s", err))
|
||||
panic(fmt.Sprintf("Unable to start web listeners: %s", err))
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
@ -299,8 +299,8 @@ func TestDebugHandler(t *testing.T) {
|
|||
{"/foo", "/bar/debug/pprof/goroutine", 404},
|
||||
} {
|
||||
opts := &Options{
|
||||
RoutePrefix: tc.prefix,
|
||||
ListenAddress: "somehost:9090",
|
||||
RoutePrefix: tc.prefix,
|
||||
ListenAddresses: []string{"somehost:9090"},
|
||||
ExternalURL: &url.URL{
|
||||
Host: "localhost.localdomain:9090",
|
||||
Scheme: "http",
|
||||
|
@ -324,8 +324,8 @@ func TestDebugHandler(t *testing.T) {
|
|||
func TestHTTPMetrics(t *testing.T) {
|
||||
t.Parallel()
|
||||
handler := New(nil, &Options{
|
||||
RoutePrefix: "/",
|
||||
ListenAddress: "somehost:9090",
|
||||
RoutePrefix: "/",
|
||||
ListenAddresses: []string{"somehost:9090"},
|
||||
ExternalURL: &url.URL{
|
||||
Host: "localhost.localdomain:9090",
|
||||
Scheme: "http",
|
||||
|
@ -381,18 +381,18 @@ func TestShutdownWithStaleConnection(t *testing.T) {
|
|||
port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
|
||||
opts := &Options{
|
||||
ListenAddress: port,
|
||||
ReadTimeout: timeout,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
TSDBDir: dbDir,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
ListenAddresses: []string{port},
|
||||
ReadTimeout: timeout,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
TSDBDir: dbDir,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
ExternalURL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost" + port,
|
||||
|
@ -408,9 +408,9 @@ func TestShutdownWithStaleConnection(t *testing.T) {
|
|||
|
||||
webHandler.config = &config.Config{}
|
||||
webHandler.notifier = ¬ifier.Manager{}
|
||||
l, err := webHandler.Listener()
|
||||
l, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start web listener: %s", err))
|
||||
panic(fmt.Sprintf("Unable to start web listeners: %s", err))
|
||||
}
|
||||
|
||||
closed := make(chan struct{})
|
||||
|
@ -448,7 +448,7 @@ func TestHandleMultipleQuitRequests(t *testing.T) {
|
|||
port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
|
||||
opts := &Options{
|
||||
ListenAddress: port,
|
||||
ListenAddresses: []string{port},
|
||||
MaxConnections: 512,
|
||||
EnableLifecycle: true,
|
||||
RoutePrefix: "/",
|
||||
|
@ -461,9 +461,9 @@ func TestHandleMultipleQuitRequests(t *testing.T) {
|
|||
webHandler := New(nil, opts)
|
||||
webHandler.config = &config.Config{}
|
||||
webHandler.notifier = ¬ifier.Manager{}
|
||||
l, err := webHandler.Listener()
|
||||
l, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start web listener: %s", err))
|
||||
panic(fmt.Sprintf("Unable to start web listeners: %s", err))
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
closed := make(chan struct{})
|
||||
|
@ -513,17 +513,17 @@ func TestAgentAPIEndPoints(t *testing.T) {
|
|||
port := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
|
||||
opts := &Options{
|
||||
ListenAddress: port,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
EnableAdminAPI: true,
|
||||
ListenAddresses: []string{port},
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
EnableAdminAPI: true,
|
||||
ExternalURL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost" + port,
|
||||
|
@ -540,9 +540,9 @@ func TestAgentAPIEndPoints(t *testing.T) {
|
|||
webHandler.SetReady(true)
|
||||
webHandler.config = &config.Config{}
|
||||
webHandler.notifier = ¬ifier.Manager{}
|
||||
l, err := webHandler.Listener()
|
||||
l, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start web listener: %s", err))
|
||||
panic(fmt.Sprintf("Unable to start web listeners: %s", err))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -628,3 +628,83 @@ func cleanupSnapshot(t *testing.T, dbDir string, resp *http.Response) {
|
|||
require.NoError(t, os.Remove(filepath.Join(dbDir, "snapshots", snapshot.Data.Name)))
|
||||
require.NoError(t, os.Remove(filepath.Join(dbDir, "snapshots")))
|
||||
}
|
||||
|
||||
func TestMultipleListenAddresses(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbDir := t.TempDir()
|
||||
|
||||
db, err := tsdb.Open(dbDir, nil, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
|
||||
// Create multiple ports for testing multiple ListenAddresses
|
||||
port1 := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
port2 := fmt.Sprintf(":%d", testutil.RandomUnprivilegedPort(t))
|
||||
|
||||
opts := &Options{
|
||||
ListenAddresses: []string{port1, port2},
|
||||
ReadTimeout: 30 * time.Second,
|
||||
MaxConnections: 512,
|
||||
Context: nil,
|
||||
Storage: nil,
|
||||
LocalStorage: &dbAdapter{db},
|
||||
TSDBDir: dbDir,
|
||||
QueryEngine: nil,
|
||||
ScrapeManager: &scrape.Manager{},
|
||||
RuleManager: &rules.Manager{},
|
||||
Notifier: nil,
|
||||
RoutePrefix: "/",
|
||||
EnableAdminAPI: true,
|
||||
ExternalURL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost" + port1,
|
||||
Path: "/",
|
||||
},
|
||||
Version: &PrometheusVersion{},
|
||||
Gatherer: prometheus.DefaultGatherer,
|
||||
}
|
||||
|
||||
opts.Flags = map[string]string{}
|
||||
|
||||
webHandler := New(nil, opts)
|
||||
|
||||
webHandler.config = &config.Config{}
|
||||
webHandler.notifier = ¬ifier.Manager{}
|
||||
l, err := webHandler.Listeners()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Unable to start web listener: %s", err))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
err := webHandler.Run(ctx, l, "")
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Can't start web handler:%s", err))
|
||||
}
|
||||
}()
|
||||
|
||||
// Give some time for the web goroutine to run since we need the server
|
||||
// to be up before starting tests.
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// Set to ready.
|
||||
webHandler.SetReady(true)
|
||||
|
||||
for _, port := range []string{port1, port2} {
|
||||
baseURL := "http://localhost" + port
|
||||
|
||||
resp, err := http.Get(baseURL + "/-/healthy")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
cleanupTestResponse(t, resp)
|
||||
|
||||
resp, err = http.Get(baseURL + "/-/ready")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
cleanupTestResponse(t, resp)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue