mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge 7d6406ba9f
into 61aa82865d
This commit is contained in:
commit
5d83b20c08
|
@ -139,10 +139,12 @@ var (
|
|||
agentOnlyFlags, serverOnlyFlags []string
|
||||
)
|
||||
|
||||
var registry = prometheus.NewRegistry()
|
||||
|
||||
func init() {
|
||||
// This can be removed when the default validation scheme in common is updated.
|
||||
model.NameValidationScheme = model.UTF8Validation
|
||||
prometheus.MustRegister(versioncollector.NewCollector(strings.ReplaceAll(appName, "-", "_")))
|
||||
registry.MustRegister(versioncollector.NewCollector(strings.ReplaceAll(appName, "-", "_")))
|
||||
|
||||
var err error
|
||||
defaultRetentionDuration, err = model.ParseDuration(defaultRetentionString)
|
||||
|
@ -293,26 +295,24 @@ func main() {
|
|||
runtime.SetMutexProfileFraction(20)
|
||||
}
|
||||
|
||||
// Unregister the default GoCollector, and reregister with our defaults.
|
||||
if prometheus.Unregister(collectors.NewGoCollector()) {
|
||||
prometheus.MustRegister(
|
||||
collectors.NewGoCollector(
|
||||
collectors.WithGoCollectorRuntimeMetrics(
|
||||
collectors.MetricsGC,
|
||||
collectors.MetricsScheduler,
|
||||
collectors.GoRuntimeMetricsRule{Matcher: goregexp.MustCompile(`^/sync/mutex/wait/total:seconds$`)},
|
||||
),
|
||||
// Register with NewGoCollector and NewProcessCollector.
|
||||
registry.MustRegister(
|
||||
collectors.NewGoCollector(
|
||||
collectors.WithGoCollectorRuntimeMetrics(
|
||||
collectors.MetricsGC,
|
||||
collectors.MetricsScheduler,
|
||||
collectors.GoRuntimeMetricsRule{Matcher: goregexp.MustCompile(`^/sync/mutex/wait/total:seconds$`)},
|
||||
),
|
||||
)
|
||||
}
|
||||
),
|
||||
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
|
||||
|
||||
cfg := flagConfig{
|
||||
notifier: notifier.Options{
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
Registerer: registry,
|
||||
},
|
||||
web: web.Options{
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
Gatherer: prometheus.DefaultGatherer,
|
||||
Registerer: registry,
|
||||
Gatherer: registry,
|
||||
},
|
||||
promslogConfig: promslog.Config{},
|
||||
}
|
||||
|
@ -545,7 +545,7 @@ func main() {
|
|||
logger := promslog.New(&cfg.promslogConfig)
|
||||
slog.SetDefault(logger)
|
||||
|
||||
notifs := notifications.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer)
|
||||
notifs := notifications.NewNotifications(cfg.maxNotificationsSubscribers, registry)
|
||||
cfg.web.NotificationsSub = notifs.Sub
|
||||
cfg.web.NotificationsGetter = notifs.Get
|
||||
notifs.AddNotification(notifications.StartingUp)
|
||||
|
@ -699,7 +699,7 @@ func main() {
|
|||
var (
|
||||
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
|
||||
scraper = &readyScrapeManager{}
|
||||
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
|
||||
remoteStorage = remote.NewStorage(logger.With("component", "remote"), registry, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
|
||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||
)
|
||||
|
||||
|
@ -720,25 +720,25 @@ func main() {
|
|||
// can only register metrics specific to a SD instance.
|
||||
// Kubernetes client metrics are the same for the whole process -
|
||||
// they are not specific to an SD instance.
|
||||
err = discovery.RegisterK8sClientMetricsWithPrometheus(prometheus.DefaultRegisterer)
|
||||
err = discovery.RegisterK8sClientMetricsWithPrometheus(registry)
|
||||
if err != nil {
|
||||
logger.Error("failed to register Kubernetes client metrics", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
|
||||
sdMetrics, err := discovery.CreateAndRegisterSDMetrics(registry)
|
||||
if err != nil {
|
||||
logger.Error("failed to register service discovery metrics", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
discoveryManagerScrape = discovery.NewManager(ctxScrape, logger.With("component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"))
|
||||
discoveryManagerScrape = discovery.NewManager(ctxScrape, logger.With("component", "discovery manager scrape"), registry, sdMetrics, discovery.Name("scrape"))
|
||||
if discoveryManagerScrape == nil {
|
||||
logger.Error("failed to create a discovery manager scrape")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
discoveryManagerNotify = discovery.NewManager(ctxNotify, logger.With("component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify"))
|
||||
discoveryManagerNotify = discovery.NewManager(ctxNotify, logger.With("component", "discovery manager notify"), registry, sdMetrics, discovery.Name("notify"))
|
||||
if discoveryManagerNotify == nil {
|
||||
logger.Error("failed to create a discovery manager notify")
|
||||
os.Exit(1)
|
||||
|
@ -749,7 +749,7 @@ func main() {
|
|||
logger.With("component", "scrape manager"),
|
||||
logging.NewJSONFileLogger,
|
||||
fanoutStorage,
|
||||
prometheus.DefaultRegisterer,
|
||||
registry,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error("failed to create a scrape manager", "err", err)
|
||||
|
@ -789,7 +789,7 @@ func main() {
|
|||
if !agentMode {
|
||||
opts := promql.EngineOpts{
|
||||
Logger: logger.With("component", "query engine"),
|
||||
Reg: prometheus.DefaultRegisterer,
|
||||
Reg: registry,
|
||||
MaxSamples: cfg.queryMaxSamples,
|
||||
Timeout: time.Duration(cfg.queryTimeout),
|
||||
ActiveQueryTracker: promql.NewActiveQueryTracker(localStoragePath, cfg.queryConcurrency, logger.With("component", "activeQueryTracker")),
|
||||
|
@ -812,7 +812,7 @@ func main() {
|
|||
NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
|
||||
Context: ctxRule,
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
Registerer: prometheus.DefaultRegisterer,
|
||||
Registerer: registry,
|
||||
Logger: logger.With("component", "rule manager"),
|
||||
OutageTolerance: time.Duration(cfg.outageTolerance),
|
||||
ForGracePeriod: time.Duration(cfg.forGracePeriod),
|
||||
|
@ -966,8 +966,8 @@ func main() {
|
|||
},
|
||||
}
|
||||
|
||||
prometheus.MustRegister(configSuccess)
|
||||
prometheus.MustRegister(configSuccessTime)
|
||||
registry.MustRegister(configSuccess)
|
||||
registry.MustRegister(configSuccessTime)
|
||||
|
||||
// Start all components while we wait for TSDB to open but only load
|
||||
// initial config and mark ourselves as ready after it completed.
|
||||
|
@ -1237,7 +1237,7 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
db, err := openDBWithMetrics(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.getStats())
|
||||
db, err := openDBWithMetrics(localStoragePath, logger, registry, &opts, localStorage.getStats())
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening storage failed: %w", err)
|
||||
}
|
||||
|
@ -1289,7 +1289,7 @@ func main() {
|
|||
}
|
||||
db, err := agent.Open(
|
||||
logger,
|
||||
prometheus.DefaultRegisterer,
|
||||
registry,
|
||||
remoteStorage,
|
||||
localStoragePath,
|
||||
&opts,
|
||||
|
|
|
@ -348,7 +348,7 @@ func main() {
|
|||
|
||||
switch parsedCmd {
|
||||
case sdCheckCmd.FullCommand():
|
||||
os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, prometheus.DefaultRegisterer))
|
||||
os.Exit(CheckSD(*sdConfigFile, *sdJobName, *sdTimeout, prometheus.NewRegistry()))
|
||||
|
||||
case checkConfigCmd.FullCommand():
|
||||
os.Exit(CheckConfig(*agentMode, *checkConfigSyntaxOnly, newConfigLintConfig(*checkConfigLint, *checkConfigLintFatal, *checkConfigIgnoreUnknownFields, model.Duration(*checkLookbackDelta)), *configFiles...))
|
||||
|
|
69
discovery/zookeeper/metrics.go
Normal file
69
discovery/zookeeper/metrics.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
// Copyright 2025 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zookeeper
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
)
|
||||
|
||||
var registerMetricsOnce sync.Once
|
||||
|
||||
type zookeeperMetrics struct {
|
||||
// The total number of ZooKeeper failures.
|
||||
failureCounter prometheus.Counter
|
||||
// The current number of Zookeeper watcher goroutines.
|
||||
numWatchers prometheus.Gauge
|
||||
}
|
||||
|
||||
// Create and register metrics.
|
||||
func newDiscovererMetrics(reg prometheus.Registerer, _ discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
||||
m := &zookeeperMetrics{
|
||||
failureCounter: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "treecache",
|
||||
Name: "zookeeper_failures_total",
|
||||
Help: "The total number of ZooKeeper failures.",
|
||||
}),
|
||||
numWatchers: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "treecache",
|
||||
Name: "watcher_goroutines",
|
||||
Help: "The current number of watcher goroutines.",
|
||||
}),
|
||||
}
|
||||
// Register zookeeper metrics once for both ServerSet and Nerve SD.
|
||||
registerMetricsOnce.Do(func() {
|
||||
reg.MustRegister(m.failureCounter, m.numWatchers)
|
||||
})
|
||||
|
||||
return &zookeeperMetrics{
|
||||
failureCounter: m.failureCounter,
|
||||
numWatchers: m.numWatchers,
|
||||
}
|
||||
}
|
||||
|
||||
// Register implements discovery.DiscovererMetrics.
|
||||
func (m *zookeeperMetrics) Register() error {
|
||||
// return m.metricRegisterer.RegisterMetrics()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unregister implements discovery.DiscovererMetrics.
|
||||
func (m *zookeeperMetrics) Unregister() {
|
||||
// m.metricRegisterer.UnregisterMetrics()
|
||||
}
|
|
@ -59,8 +59,8 @@ type ServersetSDConfig struct {
|
|||
}
|
||||
|
||||
// NewDiscovererMetrics implements discovery.Config.
|
||||
func (*ServersetSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, _ discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
||||
return &discovery.NoopDiscovererMetrics{}
|
||||
func (*ServersetSDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
||||
return newDiscovererMetrics(reg, rmi)
|
||||
}
|
||||
|
||||
// Name returns the name of the Config.
|
||||
|
@ -68,7 +68,7 @@ func (*ServersetSDConfig) Name() string { return "serverset" }
|
|||
|
||||
// NewDiscoverer returns a Discoverer for the Config.
|
||||
func (c *ServersetSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
|
||||
return NewServersetDiscovery(c, opts.Logger)
|
||||
return NewServersetDiscovery(c, opts.Logger, opts.Metrics)
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
|
@ -101,8 +101,8 @@ type NerveSDConfig struct {
|
|||
}
|
||||
|
||||
// NewDiscovererMetrics implements discovery.Config.
|
||||
func (*NerveSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, _ discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
||||
return &discovery.NoopDiscovererMetrics{}
|
||||
func (*NerveSDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
||||
return newDiscovererMetrics(reg, rmi)
|
||||
}
|
||||
|
||||
// Name returns the name of the Config.
|
||||
|
@ -110,7 +110,7 @@ func (*NerveSDConfig) Name() string { return "nerve" }
|
|||
|
||||
// NewDiscoverer returns a Discoverer for the Config.
|
||||
func (c *NerveSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
|
||||
return NewNerveDiscovery(c, opts.Logger)
|
||||
return NewNerveDiscovery(c, opts.Logger, opts.Metrics)
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
|
@ -148,16 +148,18 @@ type Discovery struct {
|
|||
|
||||
parse func(data []byte, path string) (model.LabelSet, error)
|
||||
logger *slog.Logger
|
||||
|
||||
metrics *zookeeperMetrics
|
||||
}
|
||||
|
||||
// NewNerveDiscovery returns a new Discovery for the given Nerve config.
|
||||
func NewNerveDiscovery(conf *NerveSDConfig, logger *slog.Logger) (*Discovery, error) {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember)
|
||||
func NewNerveDiscovery(conf *NerveSDConfig, logger *slog.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember, metrics)
|
||||
}
|
||||
|
||||
// NewServersetDiscovery returns a new Discovery for the given serverset config.
|
||||
func NewServersetDiscovery(conf *ServersetSDConfig, logger *slog.Logger) (*Discovery, error) {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember)
|
||||
func NewServersetDiscovery(conf *ServersetSDConfig, logger *slog.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
|
||||
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember, metrics)
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new discovery along Zookeeper parses with
|
||||
|
@ -168,6 +170,7 @@ func NewDiscovery(
|
|||
paths []string,
|
||||
logger *slog.Logger,
|
||||
pf func(data []byte, path string) (model.LabelSet, error),
|
||||
metrics discovery.DiscovererMetrics,
|
||||
) (*Discovery, error) {
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
|
@ -188,11 +191,16 @@ func NewDiscovery(
|
|||
sources: map[string]*targetgroup.Group{},
|
||||
parse: pf,
|
||||
logger: logger,
|
||||
metrics: metrics.(*zookeeperMetrics),
|
||||
}
|
||||
for _, path := range paths {
|
||||
pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent)
|
||||
sd.pathUpdates = append(sd.pathUpdates, pathUpdate)
|
||||
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger))
|
||||
// Pass our metrics to the treecache.
|
||||
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(
|
||||
conn, path, pathUpdate, logger,
|
||||
sd.metrics.failureCounter, sd.metrics.numWatchers,
|
||||
))
|
||||
}
|
||||
return sd, nil
|
||||
}
|
||||
|
|
|
@ -31,6 +31,6 @@ func TestNewDiscoveryError(t *testing.T) {
|
|||
[]string{"unreachable.test"},
|
||||
time.Second, []string{"/"},
|
||||
nil,
|
||||
func(_ []byte, _ string) (model.LabelSet, error) { return nil, nil })
|
||||
func(_ []byte, _ string) (model.LabelSet, error) { return nil, nil }, nil)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"net/http/httptrace"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
@ -109,11 +110,9 @@ var (
|
|||
},
|
||||
[]string{remoteName, endpoint, "response_type"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(remoteReadQueriesTotal, remoteReadQueries, remoteReadQueryDuration)
|
||||
}
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// Client allows reading and writing from/to a remote HTTP endpoint.
|
||||
type Client struct {
|
||||
|
@ -154,8 +153,19 @@ type ReadClient interface {
|
|||
Read(ctx context.Context, query *prompb.Query, sortSeries bool) (storage.SeriesSet, error)
|
||||
}
|
||||
|
||||
func registerRemoteReadMetrics(registry prometheus.Registerer) {
|
||||
once.Do(func() {
|
||||
registry.MustRegister(remoteReadQueriesTotal, remoteReadQueries, remoteReadQueryDuration)
|
||||
})
|
||||
}
|
||||
|
||||
// NewReadClient creates a new client for remote read.
|
||||
func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) {
|
||||
func NewReadClient(name string, conf *ClientConfig, reg prometheus.Registerer) (ReadClient, error) {
|
||||
// metrics are registered with the custom registry
|
||||
if reg != nil {
|
||||
registerRemoteReadMetrics(reg)
|
||||
}
|
||||
|
||||
httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage_read_client")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -344,7 +344,7 @@ func TestReadClient(t *testing.T) {
|
|||
Timeout: model.Duration(5 * time.Second),
|
||||
ChunkedReadLimit: config.DefaultChunkedReadLimit,
|
||||
}
|
||||
c, err := NewReadClient("test", conf)
|
||||
c, err := NewReadClient("test", conf, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
query := &prompb.Query{}
|
||||
|
|
|
@ -22,11 +22,10 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{
|
||||
var noReferenceReleases = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "string_interner_zero_reference_releases_total",
|
||||
|
@ -48,7 +47,11 @@ func newEntry(s string) *entry {
|
|||
return &entry{s: s}
|
||||
}
|
||||
|
||||
func newPool() *pool {
|
||||
func newPool(reg prometheus.Registerer) *pool {
|
||||
if reg != nil {
|
||||
reg.MustRegister(noReferenceReleases)
|
||||
}
|
||||
|
||||
return &pool{
|
||||
pool: map[string]*entry{},
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
)
|
||||
|
||||
func TestIntern(t *testing.T) {
|
||||
interner := newPool()
|
||||
interner := newPool(nil)
|
||||
testString := "TestIntern"
|
||||
interner.intern(testString)
|
||||
interned, ok := interner.pool[testString]
|
||||
|
@ -36,7 +36,7 @@ func TestIntern(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIntern_MultiRef(t *testing.T) {
|
||||
interner := newPool()
|
||||
interner := newPool(nil)
|
||||
testString := "TestIntern_MultiRef"
|
||||
|
||||
interner.intern(testString)
|
||||
|
@ -53,7 +53,7 @@ func TestIntern_MultiRef(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIntern_DeleteRef(t *testing.T) {
|
||||
interner := newPool()
|
||||
interner := newPool(nil)
|
||||
testString := "TestIntern_DeleteRef"
|
||||
|
||||
interner.intern(testString)
|
||||
|
@ -68,7 +68,7 @@ func TestIntern_DeleteRef(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIntern_MultiRef_Concurrent(t *testing.T) {
|
||||
interner := newPool()
|
||||
interner := newPool(nil)
|
||||
testString := "TestIntern_MultiRef_Concurrent"
|
||||
|
||||
interner.intern(testString)
|
||||
|
|
|
@ -321,7 +321,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
|
|||
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
|
||||
dir := t.TempDir()
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(nil), newHighestTimestampMetric(), nil, false, false, protoMsg)
|
||||
|
||||
return m
|
||||
}
|
||||
|
@ -774,7 +774,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
|
|||
}
|
||||
)
|
||||
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(nil), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m.StoreSeries(fakeSeries, 0)
|
||||
|
||||
// Attempt to samples while the manager is running. We immediately stop the
|
||||
|
@ -1391,7 +1391,7 @@ func BenchmarkStoreSeries(b *testing.B) {
|
|||
cfg := config.DefaultQueueConfig
|
||||
mcfg := config.DefaultMetadataConfig
|
||||
metrics := newQueueManagerMetrics(nil, "", "")
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(nil), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m.externalLabels = tc.externalLabels
|
||||
m.relabelConfigs = tc.relabelConfigs
|
||||
|
||||
|
@ -1431,7 +1431,7 @@ func BenchmarkStartup(b *testing.B) {
|
|||
// todo: test with new proto type(s)
|
||||
m := NewQueueManager(metrics, watcherMetrics, nil, logger, dir,
|
||||
newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(nil), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
|
||||
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
|
||||
m.watcher.MaxSegment = segments[len(segments)-2]
|
||||
m.watcher.SetMetrics()
|
||||
|
|
|
@ -65,12 +65,16 @@ type Storage struct {
|
|||
|
||||
// NewStorage returns a remote.Storage.
|
||||
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
|
||||
// register remote storage metrics in custom registry
|
||||
if reg != nil {
|
||||
reg.MustRegister(samplesIn, histogramsIn, exemplarsIn)
|
||||
}
|
||||
|
||||
if l == nil {
|
||||
l = promslog.NewNopLogger()
|
||||
}
|
||||
deduper := logging.Dedupe(l, 1*time.Minute)
|
||||
logger := slog.New(deduper)
|
||||
|
||||
s := &Storage{
|
||||
logger: logger,
|
||||
deduper: deduper,
|
||||
|
@ -122,7 +126,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
ChunkedReadLimit: rrConf.ChunkedReadLimit,
|
||||
HTTPClientConfig: rrConf.HTTPClientConfig,
|
||||
Headers: rrConf.Headers,
|
||||
})
|
||||
}, s.rws.reg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/common/promslog"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
|
@ -35,19 +34,19 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
samplesIn = promauto.NewCounter(prometheus.CounterOpts{
|
||||
samplesIn = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "samples_in_total",
|
||||
Help: "Samples in to remote storage, compare to samples out for queue managers.",
|
||||
})
|
||||
exemplarsIn = promauto.NewCounter(prometheus.CounterOpts{
|
||||
exemplarsIn = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "exemplars_in_total",
|
||||
Help: "Exemplars in to remote storage, compare to exemplars out for queue managers.",
|
||||
})
|
||||
histogramsIn = promauto.NewCounter(prometheus.CounterOpts{
|
||||
histogramsIn = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "histograms_in_total",
|
||||
|
@ -90,7 +89,7 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
|
|||
flushDeadline: flushDeadline,
|
||||
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
|
||||
dir: dir,
|
||||
interner: newPool(),
|
||||
interner: newPool(reg),
|
||||
scraper: sm,
|
||||
quit: make(chan struct{}),
|
||||
highestTimestamp: &maxTimestamp{
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
text_template "text/template"
|
||||
"time"
|
||||
|
||||
|
@ -50,11 +51,15 @@ var (
|
|||
})
|
||||
|
||||
errNaNOrInf = errors.New("value is NaN or Inf")
|
||||
|
||||
once sync.Once // Ensure registration happens only once
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(templateTextExpansionFailures)
|
||||
prometheus.MustRegister(templateTextExpansionTotal)
|
||||
func RegisterTemplateMetrics(registry prometheus.Registerer) {
|
||||
once.Do(func() {
|
||||
registry.MustRegister(templateTextExpansionFailures)
|
||||
registry.MustRegister(templateTextExpansionTotal)
|
||||
})
|
||||
}
|
||||
|
||||
// A version of vector that's easier to use from templates.
|
||||
|
|
|
@ -26,26 +26,6 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
failureCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "treecache",
|
||||
Name: "zookeeper_failures_total",
|
||||
Help: "The total number of ZooKeeper failures.",
|
||||
})
|
||||
numWatchers = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "treecache",
|
||||
Name: "watcher_goroutines",
|
||||
Help: "The current number of watcher goroutines.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(failureCounter)
|
||||
prometheus.MustRegister(numWatchers)
|
||||
}
|
||||
|
||||
// ZookeeperLogger wraps a *slog.Logger into a zk.Logger.
|
||||
type ZookeeperLogger struct {
|
||||
logger *slog.Logger
|
||||
|
@ -72,6 +52,9 @@ type ZookeeperTreeCache struct {
|
|||
head *zookeeperTreeCacheNode
|
||||
|
||||
logger *slog.Logger
|
||||
|
||||
failureCounter prometheus.Counter
|
||||
numWatchers prometheus.Gauge
|
||||
}
|
||||
|
||||
// A ZookeeperTreeCacheEvent models a Zookeeper event for a path.
|
||||
|
@ -89,21 +72,22 @@ type zookeeperTreeCacheNode struct {
|
|||
}
|
||||
|
||||
// NewZookeeperTreeCache creates a new ZookeeperTreeCache for a given path.
|
||||
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent, logger *slog.Logger) *ZookeeperTreeCache {
|
||||
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent, logger *slog.Logger, failureCounter prometheus.Counter, numWatchers prometheus.Gauge) *ZookeeperTreeCache {
|
||||
tc := &ZookeeperTreeCache{
|
||||
conn: conn,
|
||||
prefix: path,
|
||||
events: events,
|
||||
stop: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
prefix: path,
|
||||
events: events,
|
||||
stop: make(chan struct{}),
|
||||
wg: &sync.WaitGroup{},
|
||||
logger: logger,
|
||||
failureCounter: failureCounter,
|
||||
numWatchers: numWatchers,
|
||||
}
|
||||
tc.head = &zookeeperTreeCacheNode{
|
||||
events: make(chan zk.Event),
|
||||
children: map[string]*zookeeperTreeCacheNode{},
|
||||
done: make(chan struct{}, 1),
|
||||
stopped: true, // Set head's stop to be true so that recursiveDelete will not stop the head node.
|
||||
stopped: true, // head node starts stopped
|
||||
}
|
||||
tc.wg.Add(1)
|
||||
go tc.loop(path)
|
||||
|
@ -134,7 +118,7 @@ func (tc *ZookeeperTreeCache) loop(path string) {
|
|||
retryChan := make(chan struct{})
|
||||
|
||||
failure := func() {
|
||||
failureCounter.Inc()
|
||||
tc.failureCounter.Inc()
|
||||
failureMode = true
|
||||
time.AfterFunc(time.Second*10, func() {
|
||||
retryChan <- struct{}{}
|
||||
|
@ -267,7 +251,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
|||
|
||||
tc.wg.Add(1)
|
||||
go func() {
|
||||
numWatchers.Inc()
|
||||
tc.numWatchers.Inc() // use the field instead of the global
|
||||
// Pass up zookeeper events, until the node is deleted.
|
||||
select {
|
||||
case event := <-dataWatcher:
|
||||
|
@ -276,7 +260,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
|||
node.events <- event
|
||||
case <-node.done:
|
||||
}
|
||||
numWatchers.Dec()
|
||||
tc.numWatchers.Dec() // same here
|
||||
tc.wg.Done()
|
||||
}()
|
||||
return nil
|
||||
|
|
|
@ -306,6 +306,11 @@ func New(logger *slog.Logger, o *Options) *Handler {
|
|||
logger = promslog.NewNopLogger()
|
||||
}
|
||||
|
||||
// Register the template metrics
|
||||
if o.Registerer != nil {
|
||||
template.RegisterTemplateMetrics(o.Registerer)
|
||||
}
|
||||
|
||||
m := newMetrics(o.Registerer)
|
||||
router := route.New().
|
||||
WithInstrumentation(m.instrumentHandler).
|
||||
|
@ -433,7 +438,7 @@ func New(logger *slog.Logger, o *Options) *Handler {
|
|||
})
|
||||
|
||||
router.Get("/version", h.version)
|
||||
router.Get("/metrics", promhttp.Handler().ServeHTTP)
|
||||
router.Get("/metrics", promhttp.HandlerFor(o.Gatherer, promhttp.HandlerOpts{}).ServeHTTP)
|
||||
|
||||
router.Get("/federate", readyf(httputil.CompressionHandler{
|
||||
Handler: http.HandlerFunc(h.federation),
|
||||
|
|
Loading…
Reference in a new issue