mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
prw2: Split PRW2.0 from metadata-wal-records feature (#16030)
Rationales: * metadata-wal-records might be deprecated and replaced going forward: https://github.com/prometheus/prometheus/issues/15911 * PRW 2.0 works without metadata just fine (although it sends untyped metrics as expected). Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
733a5e9eb4
commit
de23a9667c
|
@ -231,7 +231,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
|
||||||
logger.Info("Experimental additional scrape metrics enabled")
|
logger.Info("Experimental additional scrape metrics enabled")
|
||||||
case "metadata-wal-records":
|
case "metadata-wal-records":
|
||||||
c.scrape.AppendMetadata = true
|
c.scrape.AppendMetadata = true
|
||||||
logger.Info("Experimental metadata records in WAL enabled, required for remote write 2.0")
|
logger.Info("Experimental metadata records in WAL enabled")
|
||||||
case "promql-per-step-stats":
|
case "promql-per-step-stats":
|
||||||
c.enablePerStepStats = true
|
c.enablePerStepStats = true
|
||||||
logger.Info("Experimental per-step statistics reporting")
|
logger.Info("Experimental per-step statistics reporting")
|
||||||
|
@ -699,7 +699,7 @@ func main() {
|
||||||
var (
|
var (
|
||||||
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
|
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
|
||||||
scraper = &readyScrapeManager{}
|
scraper = &readyScrapeManager{}
|
||||||
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata)
|
remoteStorage = remote.NewStorage(logger.With("component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
|
||||||
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -114,8 +114,7 @@ Fall back to serving the old (Prometheus 2.x) web UI instead of the new UI. The
|
||||||
When enabled, Prometheus will store metadata in-memory and keep track of
|
When enabled, Prometheus will store metadata in-memory and keep track of
|
||||||
metadata changes as WAL records on a per-series basis.
|
metadata changes as WAL records on a per-series basis.
|
||||||
|
|
||||||
This must be used if
|
This must be used if you would like to send metadata using the new remote write 2.0.
|
||||||
you are also using remote write 2.0 as it will only gather metadata from the WAL.
|
|
||||||
|
|
||||||
## Delay compaction start time
|
## Delay compaction start time
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ func TestBasicContentNegotiation(t *testing.T) {
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -243,7 +243,7 @@ func TestSampleDelivery(t *testing.T) {
|
||||||
} {
|
} {
|
||||||
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -362,7 +362,7 @@ func TestMetadataDelivery(t *testing.T) {
|
||||||
|
|
||||||
func TestWALMetadataDelivery(t *testing.T) {
|
func TestWALMetadataDelivery(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
cfg := config.DefaultQueueConfig
|
cfg := config.DefaultQueueConfig
|
||||||
|
|
|
@ -93,7 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
GlobalConfig: config.DefaultGlobalConfig,
|
||||||
RemoteReadConfigs: tc.cfgs,
|
RemoteReadConfigs: tc.cfgs,
|
||||||
|
|
|
@ -64,7 +64,7 @@ type Storage struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStorage returns a remote.Storage.
|
// NewStorage returns a remote.Storage.
|
||||||
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL bool) *Storage {
|
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = promslog.NewNopLogger()
|
l = promslog.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,7 @@ func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeC
|
||||||
deduper: deduper,
|
deduper: deduper,
|
||||||
localStartTimeCallback: stCallback,
|
localStartTimeCallback: stCallback,
|
||||||
}
|
}
|
||||||
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL)
|
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ import (
|
||||||
func TestStorageLifecycle(t *testing.T) {
|
func TestStorageLifecycle(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
GlobalConfig: config.DefaultGlobalConfig,
|
||||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||||
|
@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) {
|
||||||
func TestUpdateRemoteReadConfigs(t *testing.T) {
|
func TestUpdateRemoteReadConfigs(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
|
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.GlobalConfig{},
|
GlobalConfig: config.GlobalConfig{},
|
||||||
|
@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
|
||||||
func TestFilterExternalLabels(t *testing.T) {
|
func TestFilterExternalLabels(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
|
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.GlobalConfig{
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) {
|
||||||
func TestIgnoreExternalLabels(t *testing.T) {
|
func TestIgnoreExternalLabels(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
|
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.GlobalConfig{
|
GlobalConfig: config.GlobalConfig{
|
||||||
|
@ -154,7 +154,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
|
||||||
// ApplyConfig runs concurrently with Notify
|
// ApplyConfig runs concurrently with Notify
|
||||||
// See https://github.com/prometheus/prometheus/issues/12747
|
// See https://github.com/prometheus/prometheus/issues/12747
|
||||||
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
|
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
|
||||||
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false)
|
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(2000)
|
wg.Add(2000)
|
||||||
|
|
|
@ -15,7 +15,6 @@ package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"math"
|
"math"
|
||||||
|
@ -67,7 +66,6 @@ type WriteStorage struct {
|
||||||
externalLabels labels.Labels
|
externalLabels labels.Labels
|
||||||
dir string
|
dir string
|
||||||
queues map[string]*QueueManager
|
queues map[string]*QueueManager
|
||||||
metadataInWAL bool
|
|
||||||
samplesIn *ewmaRate
|
samplesIn *ewmaRate
|
||||||
flushDeadline time.Duration
|
flushDeadline time.Duration
|
||||||
interner *pool
|
interner *pool
|
||||||
|
@ -79,7 +77,7 @@ type WriteStorage struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWriteStorage creates and runs a WriteStorage.
|
// NewWriteStorage creates and runs a WriteStorage.
|
||||||
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage {
|
func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage {
|
||||||
if logger == nil {
|
if logger == nil {
|
||||||
logger = promslog.NewNopLogger()
|
logger = promslog.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -95,7 +93,6 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
|
||||||
interner: newPool(),
|
interner: newPool(),
|
||||||
scraper: sm,
|
scraper: sm,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
metadataInWAL: metadataInWal,
|
|
||||||
highestTimestamp: &maxTimestamp{
|
highestTimestamp: &maxTimestamp{
|
||||||
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
@ -149,9 +146,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
||||||
newQueues := make(map[string]*QueueManager)
|
newQueues := make(map[string]*QueueManager)
|
||||||
newHashes := []string{}
|
newHashes := []string{}
|
||||||
for _, rwConf := range conf.RemoteWriteConfigs {
|
for _, rwConf := range conf.RemoteWriteConfigs {
|
||||||
if rwConf.ProtobufMessage == config.RemoteWriteProtoMsgV2 && !rws.metadataInWAL {
|
|
||||||
return errors.New("invalid remote write configuration, if you are using remote write version 2.0 the `--enable-feature=metadata-wal-records` feature flag must be enabled")
|
|
||||||
}
|
|
||||||
hash, err := toHash(rwConf)
|
hash, err := toHash(rwConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -117,7 +117,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
|
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
GlobalConfig: config.DefaultGlobalConfig,
|
||||||
RemoteWriteConfigs: tc.cfgs,
|
RemoteWriteConfigs: tc.cfgs,
|
||||||
|
@ -143,7 +143,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
|
||||||
hash, err := toHash(cfg)
|
hash, err := toHash(cfg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
|
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
|
||||||
|
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
GlobalConfig: config.DefaultGlobalConfig,
|
||||||
|
@ -165,7 +165,7 @@ func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
|
||||||
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
|
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
|
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
|
||||||
c1 := &config.RemoteWriteConfig{
|
c1 := &config.RemoteWriteConfig{
|
||||||
Name: "named",
|
Name: "named",
|
||||||
URL: &common_config.URL{
|
URL: &common_config.URL{
|
||||||
|
@ -206,7 +206,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
|
||||||
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
|
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.DefaultGlobalConfig,
|
GlobalConfig: config.DefaultGlobalConfig,
|
||||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||||
|
@ -222,7 +222,7 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
|
||||||
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
|
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
|
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
|
||||||
|
|
||||||
externalLabels := labels.FromStrings("external", "true")
|
externalLabels := labels.FromStrings("external", "true")
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
|
@ -250,7 +250,7 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
|
||||||
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
|
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
conf := &config.Config{
|
conf := &config.Config{
|
||||||
GlobalConfig: config.GlobalConfig{},
|
GlobalConfig: config.GlobalConfig{},
|
||||||
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{
|
||||||
|
@ -274,7 +274,7 @@ func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
|
||||||
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
|
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
|
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
|
||||||
|
|
||||||
c0 := &config.RemoteWriteConfig{
|
c0 := &config.RemoteWriteConfig{
|
||||||
RemoteTimeout: model.Duration(10 * time.Second),
|
RemoteTimeout: model.Duration(10 * time.Second),
|
||||||
|
|
|
@ -91,7 +91,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
dbDir := t.TempDir()
|
dbDir := t.TempDir()
|
||||||
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
|
rs := remote.NewStorage(promslog.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, rs.Close())
|
require.NoError(t, rs.Close())
|
||||||
})
|
})
|
||||||
|
@ -737,7 +737,7 @@ func TestLockfile(t *testing.T) {
|
||||||
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
|
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
|
||||||
logger := promslog.NewNopLogger()
|
logger := promslog.NewNopLogger()
|
||||||
reg := prometheus.NewRegistry()
|
reg := prometheus.NewRegistry()
|
||||||
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
|
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil)
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, rs.Close())
|
require.NoError(t, rs.Close())
|
||||||
})
|
})
|
||||||
|
@ -757,7 +757,7 @@ func TestLockfile(t *testing.T) {
|
||||||
|
|
||||||
func Test_ExistingWAL_NextRef(t *testing.T) {
|
func Test_ExistingWAL_NextRef(t *testing.T) {
|
||||||
dbDir := t.TempDir()
|
dbDir := t.TempDir()
|
||||||
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
|
rs := remote.NewStorage(promslog.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, rs.Close())
|
require.NoError(t, rs.Close())
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -495,7 +495,7 @@ func TestEndpoints(t *testing.T) {
|
||||||
|
|
||||||
remote := remote.NewStorage(promslog.New(&promslogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
|
remote := remote.NewStorage(promslog.New(&promslogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}, dbDir, 1*time.Second, nil, false)
|
}, dbDir, 1*time.Second, nil)
|
||||||
|
|
||||||
err = remote.ApplyConfig(&config.Config{
|
err = remote.ApplyConfig(&config.Config{
|
||||||
RemoteReadConfigs: []*config.RemoteReadConfig{
|
RemoteReadConfigs: []*config.RemoteReadConfig{
|
||||||
|
|
Loading…
Reference in a new issue