diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index a3ad1745c..6a49f637f 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -161,12 +161,13 @@ type flagConfig struct {
memlimitRatio float64
// These options are extracted from featureList
// for ease of use.
- enableExpandExternalLabels bool
- enableNewSDManager bool
- enablePerStepStats bool
- enableAutoGOMAXPROCS bool
- enableAutoGOMEMLIMIT bool
- enableConcurrentRuleEval bool
+ enableExpandExternalLabels bool
+ enableNewSDManager bool
+ enablePerStepStats bool
+ enableAutoGOMAXPROCS bool
+ enableAutoGOMEMLIMIT bool
+ enableConcurrentRuleEval bool
+ enableRemoteWriteStatefulWacher bool
prometheusURL string
corsRegexString string
@@ -259,6 +260,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
continue
case "promql-at-modifier", "promql-negative-offset":
level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o)
+ case "remote-write-stateful-watcher":
+ c.enableRemoteWriteStatefulWacher = true
+ level.Info(logger).Log("msg", "Experimental remote write stateful watcher enabled.")
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
}
@@ -504,7 +508,7 @@ func main() {
a.Flag("scrape.name-escaping-scheme", `Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots".`).Default(scrape.DefaultNameEscapingScheme.String()).StringVar(&cfg.nameEscapingScheme)
- a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
+ a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), remote-write-stateful-watcher, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@@ -691,9 +695,10 @@ func main() {
level.Info(logger).Log("vm_limits", prom_runtime.VMLimits())
var (
- localStorage = &readyStorage{stats: tsdb.NewDBStats()}
- scraper = &readyScrapeManager{}
- remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata)
+ localStorage = &readyStorage{stats: tsdb.NewDBStats()}
+ scraper = &readyScrapeManager{}
+
+ remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata, cfg.enableRemoteWriteStatefulWacher)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)
diff --git a/config/config.go b/config/config.go
index c9e8efbf3..c9d26c8a7 100644
--- a/config/config.go
+++ b/config/config.go
@@ -391,9 +391,10 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
if rwcfg == nil {
return errors.New("empty or null remote write config section")
}
- // Skip empty names, we fill their name with their config hash in remote write code.
+
+ // Skip empty names, we could fill their name with their config hash in remote write code.
if _, ok := rwNames[rwcfg.Name]; ok && rwcfg.Name != "" {
- return fmt.Errorf("found multiple remote write configs with job name %q", rwcfg.Name)
+ return fmt.Errorf("found multiple remote write configs with name %q", rwcfg.Name)
}
rwNames[rwcfg.Name] = struct{}{}
}
diff --git a/config/config_test.go b/config/config_test.go
index 7b910b5d1..6d70fda4d 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -1850,7 +1850,7 @@ var expectedErrors = []struct {
},
{
filename: "remote_write_dup.bad.yml",
- errMsg: `found multiple remote write configs with job name "queue1"`,
+ errMsg: `found multiple remote write configs with name "queue1"`,
},
{
filename: "remote_read_dup.bad.yml",
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index 8fefa8ecc..b51373bc2 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -58,7 +58,7 @@ The Prometheus monitoring server
| --query.max-concurrency
| Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| --query.max-samples
| Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| --scrape.name-escaping-scheme
| Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots". | `values` |
-| --enable-feature
... | Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
+| --enable-feature
... | Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), remote-write-stateful-watcher, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| --log.level
| Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| --log.format
| Output format of log messages. One of: [logfmt, json] | `logfmt` |
diff --git a/docs/feature_flags.md b/docs/feature_flags.md
index 1ef9efb9b..8299004e5 100644
--- a/docs/feature_flags.md
+++ b/docs/feature_flags.md
@@ -277,3 +277,7 @@ specified interval. The interval is defined by the
Configuration reloads are triggered by detecting changes in the checksum of the
main configuration file or any referenced files, such as rule and scrape
configurations.
+
+## Remote Write Stateful Watcher
+
+TODO
diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go
index b1c899726..0dca3871f 100644
--- a/storage/remote/queue_manager.go
+++ b/storage/remote/queue_manager.go
@@ -415,7 +415,7 @@ type QueueManager struct {
relabelConfigs []*relabel.Config
sendExemplars bool
sendNativeHistograms bool
- watcher *wlog.Watcher
+ watcher wlog.WALWatcher
metadataWatcher *MetadataWatcher
clientMtx sync.RWMutex
@@ -456,6 +456,7 @@ func NewQueueManager(
readerMetrics *wlog.LiveReaderMetrics,
logger log.Logger,
dir string,
+ statefulWatcher bool,
samplesIn *ewmaRate,
cfg config.QueueConfig,
mCfg config.MetadataConfig,
@@ -519,7 +520,11 @@ func NewQueueManager(
if t.protoMsg != config.RemoteWriteProtoMsgV1 {
walMetadata = true
}
+
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
+ if statefulWatcher {
+ t.watcher = wlog.NewStatefulWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
+ }
// The current MetadataWatcher implementation is mutually exclusive
// with the new approach, which stores metadata as WAL records and
diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go
index 032a1a92f..f0a5b0f77 100644
--- a/storage/remote/queue_manager_test.go
+++ b/storage/remote/queue_manager_test.go
@@ -17,11 +17,9 @@ import (
"context"
"errors"
"fmt"
- "math"
"math/rand"
"os"
"runtime/pprof"
- "sort"
"strconv"
"strings"
"sync"
@@ -133,7 +131,7 @@ func TestBasicContentNegotiation(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true, false)
defer s.Close()
var (
@@ -241,7 +239,7 @@ func TestSampleDelivery(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true, false)
defer s.Close()
var (
@@ -309,17 +307,17 @@ func TestSampleDelivery(t *testing.T) {
}
}
-func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg) (*TestWriteClient, *QueueManager) {
+func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg, statefulWatcher bool) (*TestWriteClient, *QueueManager) {
c := NewTestWriteClient(protoMsg)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
- return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
+ return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg, statefulWatcher)
}
-func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
+func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg, statefulWatcher bool) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
- m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
+ m := NewQueueManager(metrics, nil, nil, nil, dir, statefulWatcher, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
return m
}
@@ -332,7 +330,7 @@ func testDefaultQueueConfig() config.QueueConfig {
}
func TestMetadataDelivery(t *testing.T) {
- c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
+ c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1, false)
m.Start()
defer m.Stop()
@@ -360,7 +358,7 @@ func TestMetadataDelivery(t *testing.T) {
func TestWALMetadataDelivery(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true, false)
defer s.Close()
cfg := config.DefaultQueueConfig
@@ -410,7 +408,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
- m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
+ m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@@ -447,7 +445,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
})
}
- c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
+ c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg, false)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
@@ -467,7 +465,7 @@ func TestShutdown(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
- m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
+ m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@@ -502,7 +500,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
- m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
+ m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@@ -527,7 +525,7 @@ func TestReshard(t *testing.T) {
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
- m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg)
+ m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg, false)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
@@ -566,7 +564,7 @@ func TestReshardRaceWithStop(t *testing.T) {
exitCh := make(chan struct{})
go func() {
for {
- m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
+ m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg, false)
m.Start()
h.Unlock()
@@ -605,7 +603,7 @@ func TestReshardPartialBatch(t *testing.T) {
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
- m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
+ m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg, false)
m.StoreSeries(series, 0)
m.Start()
@@ -652,7 +650,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
- m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
+ m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@@ -679,7 +677,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
func TestReleaseNoninternedString(t *testing.T) {
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
- _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
+ _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg, false)
m.Start()
defer m.Stop()
for i := 1; i < 1000; i++ {
@@ -727,7 +725,7 @@ func TestShouldReshard(t *testing.T) {
}
for _, c := range cases {
- _, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1)
+ _, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1, false)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@@ -772,7 +770,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)
- m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
+ m := NewQueueManager(metrics, nil, nil, nil, "", false, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0)
// Attempt to samples while the manager is running. We immediately stop the
@@ -1326,7 +1324,7 @@ func BenchmarkSampleSend(b *testing.B) {
cfg.MaxShards = 20
// todo: test with new proto type(s)
- m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
+ m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1, false)
m.StoreSeries(series, 0)
// These should be received by the client.
@@ -1385,7 +1383,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
- m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
+ m := NewQueueManager(metrics, nil, nil, nil, dir, false, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
@@ -1395,7 +1393,8 @@ func BenchmarkStoreSeries(b *testing.B) {
}
}
-func BenchmarkStartup(b *testing.B) {
+// TODO: adjust this
+/* func BenchmarkStartup(b *testing.B) {
dir := os.Getenv("WALDIR")
if dir == "" {
b.Skip("WALDIR env var not set")
@@ -1431,7 +1430,7 @@ func BenchmarkStartup(b *testing.B) {
err := m.watcher.Run()
require.NoError(b, err)
}
-}
+} */
func TestProcessExternalLabels(t *testing.T) {
b := labels.NewBuilder(labels.EmptyLabels())
@@ -1504,7 +1503,7 @@ func TestProcessExternalLabels(t *testing.T) {
func TestCalculateDesiredShards(t *testing.T) {
cfg := config.DefaultQueueConfig
- _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
+ _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1, false)
samplesIn := m.dataIn
// Need to start the queue manager so the proper metrics are initialized.
@@ -1574,7 +1573,7 @@ func TestCalculateDesiredShards(t *testing.T) {
}
func TestCalculateDesiredShardsDetail(t *testing.T) {
- _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
+ _, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1, false)
samplesIn := m.dataIn
for _, tc := range []struct {
@@ -1952,7 +1951,7 @@ func TestDropOldTimeSeries(t *testing.T) {
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
- m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
+ m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1, false)
m.StoreSeries(series, 0)
m.Start()
@@ -1987,7 +1986,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
metadataCfg.SendInterval = model.Duration(time.Second * 60)
metadataCfg.MaxSamplesPerSend = maxSamplesPerSend
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
- m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, config.RemoteWriteProtoMsgV1)
+ m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, config.RemoteWriteProtoMsgV1, false)
m.Start()
diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go
index d63cefc3f..36ddedd37 100644
--- a/storage/remote/read_test.go
+++ b/storage/remote/read_test.go
@@ -93,7 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,
diff --git a/storage/remote/storage.go b/storage/remote/storage.go
index 05634f179..9e38c801d 100644
--- a/storage/remote/storage.go
+++ b/storage/remote/storage.go
@@ -62,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
-func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL bool) *Storage {
+func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL, statefulWatcher bool) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
- s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL)
+ s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL, statefulWatcher)
return s
}
diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go
index 8c97d870e..794da54de 100644
--- a/storage/remote/storage_test.go
+++ b/storage/remote/storage_test.go
@@ -29,7 +29,7 @@ import (
func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) {
func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) {
func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir()
- s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@@ -154,7 +154,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
// ApplyConfig runs concurrently with Notify
// See https://github.com/prometheus/prometheus/issues/12747
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
- s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false)
+ s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false, false)
var wg sync.WaitGroup
wg.Add(2000)
diff --git a/storage/remote/write.go b/storage/remote/write.go
index 3d2f1fdfc..566256c01 100644
--- a/storage/remote/write.go
+++ b/storage/remote/write.go
@@ -18,10 +18,13 @@ import (
"errors"
"fmt"
"math"
+ "os"
+ "path/filepath"
"sync"
"time"
"github.com/go-kit/log"
+ "github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -73,12 +76,14 @@ type WriteStorage struct {
scraper ReadyScrapeManager
quit chan struct{}
+ statefulWatcher bool
+
// For timestampTracker.
highestTimestamp *maxTimestamp
}
// NewWriteStorage creates and runs a WriteStorage.
-func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage {
+func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal, statefulWatcher bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
@@ -95,6 +100,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
scraper: sm,
quit: make(chan struct{}),
metadataInWAL: metadataInWal,
+ statefulWatcher: statefulWatcher,
highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
@@ -124,6 +130,49 @@ func (rws *WriteStorage) run() {
}
}
+// purgeStaleWatchersMarkers iterates over progress marker files and deleted those of stateful watchers that are no longer in use.
+// rws.mtx should be held before calling this.
+func (rws *WriteStorage) purgeStaleWatchersMarkers() {
+ markersDir := filepath.Join(rws.dir, "wal", wlog.ProgressMarkerDirName)
+ files, err := os.ReadDir(markersDir)
+ if err != nil {
+ level.Error(rws.logger).Log(
+ "msg", "Failed to read the directory containing the stateful watchers' progress marker files",
+ "dir", markersDir,
+ "err", err,
+ )
+ }
+
+ statefulWatchers := make(map[string]struct{})
+ for _, q := range rws.queues {
+ statefulWatchers[q.watcher.Name()] = struct{}{}
+ }
+
+ for _, f := range files {
+ fileName := f.Name()
+ if filepath.Ext(fileName) != wlog.ProgressMarkerFileExt {
+ level.Warn(rws.logger).Log(
+ "msg", "the file doesn't belong in the directory",
+ "file", fileName,
+ "dir", markersDir,
+ )
+ continue
+ }
+ watcherName := fileName[:len(fileName)-len(wlog.ProgressMarkerFileExt)]
+ if _, inUse := statefulWatchers[watcherName]; !inUse {
+ err = os.Remove(filepath.Join(markersDir, fileName))
+ if err != nil {
+ level.Error(rws.logger).Log(
+ "msg", "Failed to remove progress marker file",
+ "dir", markersDir,
+ "file", fileName,
+ "err", err,
+ )
+ }
+ }
+ }
+}
+
func (rws *WriteStorage) Notify() {
rws.mtx.Lock()
defer rws.mtx.Unlock()
@@ -148,9 +197,14 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newQueues := make(map[string]*QueueManager)
newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs {
+ if rwConf.Name == "" && rws.statefulWatcher {
+ return fmt.Errorf("empty name for URL: %s, the `--enable-feature=remote-write-stateful-watcher` feature flag requires it to be set as it's used as an identifier", rwConf.URL)
+ }
+
if rwConf.ProtobufMessage == config.RemoteWriteProtoMsgV2 && !rws.metadataInWAL {
return errors.New("invalid remote write configuration, if you are using remote write version 2.0 the `--enable-feature=metadata-wal-records` feature flag must be enabled")
}
+
hash, err := toHash(rwConf)
if err != nil {
return err
@@ -203,6 +257,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.liveReaderMetrics,
rws.logger,
rws.dir,
+ rws.statefulWatcher,
rws.samplesIn,
rwConf.QueueConfig,
rwConf.MetadataConfig,
@@ -221,10 +276,25 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newHashes = append(newHashes, hash)
}
- // Anything remaining in rws.queues is a queue who's config has
+ // Used to identify stale statefulset watchers whose states should be pruged, see below.
+ newStatefulWatchers := make(map[string]struct{})
+ if rws.statefulWatcher {
+ for _, q := range newQueues {
+ newStatefulWatchers[q.watcher.Name()] = struct{}{}
+ }
+ }
+
+ // Anything remaining in rws.queues is a queue whose config has
// changed or was removed from the overall remote write config.
for _, q := range rws.queues {
q.Stop()
+
+ if rws.statefulWatcher {
+ // Purge the statefulset watcher if its corresponding queue config was removed.
+ if _, found := newStatefulWatchers[q.watcher.Name()]; !found {
+ q.watcher.PurgeState()
+ }
+ }
}
for _, hash := range newHashes {
@@ -271,6 +341,9 @@ func (rws *WriteStorage) Close() error {
for _, q := range rws.queues {
q.Stop()
}
+ if rws.statefulWatcher {
+ rws.purgeStaleWatchersMarkers()
+ }
close(rws.quit)
return nil
}
diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go
index 83dfffbae..ed91aac19 100644
--- a/storage/remote/write_test.go
+++ b/storage/remote/write_test.go
@@ -16,9 +16,12 @@ package remote
import (
"bytes"
"errors"
+ "fmt"
"net/http"
"net/http/httptest"
"net/url"
+ "os"
+ "path"
"testing"
"time"
@@ -35,9 +38,9 @@ import (
"github.com/prometheus/prometheus/model/relabel"
)
-func testRemoteWriteConfig() *config.RemoteWriteConfig {
+func testRemoteWriteConfig(name string) *config.RemoteWriteConfig {
return &config.RemoteWriteConfig{
- Name: "dev",
+ Name: name,
URL: &common_config.URL{
URL: &url.URL{
Scheme: "http",
@@ -105,7 +108,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
- s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
+ s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@@ -124,36 +127,40 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
}
func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
- dir := t.TempDir()
+ for _, statefulWatcher := range []bool{true, false} {
+ t.Run(fmt.Sprintf("statefulWatcher=%t", statefulWatcher), func(t *testing.T) {
+ dir := t.TempDir()
- cfg := testRemoteWriteConfig()
+ cfg := testRemoteWriteConfig("dev")
- hash, err := toHash(cfg)
- require.NoError(t, err)
+ hash, err := toHash(cfg)
+ require.NoError(t, err)
- s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
+ s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, statefulWatcher)
- conf := &config.Config{
- GlobalConfig: config.DefaultGlobalConfig,
- RemoteWriteConfigs: []*config.RemoteWriteConfig{cfg},
+ conf := &config.Config{
+ GlobalConfig: config.DefaultGlobalConfig,
+ RemoteWriteConfigs: []*config.RemoteWriteConfig{cfg},
+ }
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Equal(t, s.queues[hash].client().Name(), cfg.Name)
+
+ // Change the queues name, ensure the queue has been restarted.
+ conf.RemoteWriteConfigs[0].Name = "dev-2"
+ require.NoError(t, s.ApplyConfig(conf))
+ hash, err = toHash(cfg)
+ require.NoError(t, err)
+ require.Equal(t, s.queues[hash].client().Name(), conf.RemoteWriteConfigs[0].Name)
+
+ require.NoError(t, s.Close())
+ })
}
- require.NoError(t, s.ApplyConfig(conf))
- require.Equal(t, s.queues[hash].client().Name(), cfg.Name)
-
- // Change the queues name, ensure the queue has been restarted.
- conf.RemoteWriteConfigs[0].Name = "dev-2"
- require.NoError(t, s.ApplyConfig(conf))
- hash, err = toHash(cfg)
- require.NoError(t, err)
- require.Equal(t, s.queues[hash].client().Name(), conf.RemoteWriteConfigs[0].Name)
-
- require.NoError(t, s.Close())
}
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
- s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
+ s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false, false)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@@ -194,7 +201,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
dir := t.TempDir()
- s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@@ -207,16 +214,93 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
require.NoError(t, s.Close())
}
+func TestWriteStorageApplyConfig_StatefulWatcher(t *testing.T) {
+ dir := t.TempDir()
+ markersDir := path.Join(dir, "wal", "remote_write")
+
+ // WriteStorage with a WAL stateful watcher.
+ s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, true)
+ cfg1 := testRemoteWriteConfig("")
+ cfg2 := testRemoteWriteConfig("bar")
+ conf := &config.Config{
+ GlobalConfig: config.DefaultGlobalConfig,
+ RemoteWriteConfigs: []*config.RemoteWriteConfig{cfg1, cfg2},
+ }
+
+ // 1. remote-write config cfg1 without a name is not accepted.
+ require.Error(t, s.ApplyConfig(conf))
+ require.Empty(t, s.queues)
+
+ // 2. two remote-write configs with the same name is not accpected.
+ conf.RemoteWriteConfigs[0].Name = "bar"
+ require.Error(t, s.ApplyConfig(conf))
+ require.Empty(t, s.queues)
+
+ // 3. Set the cfg1 name back.
+ conf.RemoteWriteConfigs[0].Name = "foo"
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 2)
+
+ // Ensure the watchers have the appropriate names.
+ hash1, err := toHash(conf.RemoteWriteConfigs[0])
+ require.NoError(t, err)
+ require.Equal(t, "foo", s.queues[hash1].watcher.Name())
+ hash2, err := toHash(conf.RemoteWriteConfigs[1])
+ require.NoError(t, err)
+ require.Equal(t, "bar", s.queues[hash2].watcher.Name())
+
+ // Ensure the progress marker files were clreated.
+ require.FileExists(t, path.Join(markersDir, "foo.json"))
+ require.FileExists(t, path.Join(markersDir, "bar.json"))
+
+ // 4. Simulate a remote-write config entry removal by chaning cfg2 name.
+ conf.RemoteWriteConfigs[1].Name = "baz"
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 2)
+
+ // Ensure the appropriate progress marker files are present.
+ require.FileExists(t, path.Join(markersDir, "foo.json"))
+ require.FileExists(t, path.Join(markersDir, "baz.json"))
+ require.NoFileExists(t, path.Join(markersDir, "bar.json"))
+
+ require.NoError(t, s.Close())
+
+ // Ensure progress marker files weren't deleted.
+ require.FileExists(t, path.Join(markersDir, "foo.json"))
+ require.FileExists(t, path.Join(markersDir, "baz.json"))
+}
+
+func TestWriteStorageCleanup_StatefulWatcher(t *testing.T) {
+ dir := t.TempDir()
+
+ // There are some stale progress markers files around without corresponding remote-write config entries.
+ markersDir := path.Join(dir, "wal", "remote_write")
+ os.MkdirAll(markersDir, 0o700)
+ f, err := os.Create(path.Join(markersDir, "foo.json"))
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+ f, err = os.Create(path.Join(markersDir, "bar.json"))
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ // WriteStorage should clean them while closing.
+ s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, true)
+ require.NoError(t, s.Close())
+
+ require.NoFileExists(t, path.Join(markersDir, "foo.json"))
+ require.NoFileExists(t, path.Join(markersDir, "bar.json"))
+}
+
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
- s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
+ s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false, false)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
- testRemoteWriteConfig(),
+ testRemoteWriteConfig("dev"),
},
}
hash, err := toHash(conf.RemoteWriteConfigs[0])
@@ -236,136 +320,156 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
}
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
- dir := t.TempDir()
+ for _, statefulWatcher := range []bool{true, false} {
+ t.Run(fmt.Sprintf("statefulWatcher=%t", statefulWatcher), func(t *testing.T) {
+ dir := t.TempDir()
- s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
- conf := &config.Config{
- GlobalConfig: config.GlobalConfig{},
- RemoteWriteConfigs: []*config.RemoteWriteConfig{
- baseRemoteWriteConfig("http://test-storage.com"),
- },
+ s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, statefulWatcher)
+ conf := &config.Config{
+ GlobalConfig: config.GlobalConfig{},
+ RemoteWriteConfigs: []*config.RemoteWriteConfig{
+ baseRemoteWriteConfig("http://test-storage.com"),
+ },
+ }
+ if statefulWatcher {
+ conf.RemoteWriteConfigs[0].Name = "foo"
+ }
+ hash, err := toHash(conf.RemoteWriteConfigs[0])
+ require.NoError(t, err)
+
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 1)
+
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 1)
+ _, hashExists := s.queues[hash]
+ require.True(t, hashExists, "Queue pointer should have remained the same")
+
+ require.NoError(t, s.Close())
+ })
}
- hash, err := toHash(conf.RemoteWriteConfigs[0])
- require.NoError(t, err)
-
- require.NoError(t, s.ApplyConfig(conf))
- require.Len(t, s.queues, 1)
-
- require.NoError(t, s.ApplyConfig(conf))
- require.Len(t, s.queues, 1)
- _, hashExists := s.queues[hash]
- require.True(t, hashExists, "Queue pointer should have remained the same")
-
- require.NoError(t, s.Close())
}
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
- dir := t.TempDir()
+ for _, statefulWatcher := range []bool{true, false} {
+ t.Run(fmt.Sprintf("statefulWatcher=%t", statefulWatcher), func(t *testing.T) {
+ dir := t.TempDir()
- s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
+ s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, statefulWatcher)
- c0 := &config.RemoteWriteConfig{
- RemoteTimeout: model.Duration(10 * time.Second),
- QueueConfig: config.DefaultQueueConfig,
- WriteRelabelConfigs: []*relabel.Config{
- {
- Regex: relabel.MustNewRegexp(".+"),
- },
- },
- ProtobufMessage: config.RemoteWriteProtoMsgV1,
- }
- c1 := &config.RemoteWriteConfig{
- RemoteTimeout: model.Duration(20 * time.Second),
- QueueConfig: config.DefaultQueueConfig,
- HTTPClientConfig: common_config.HTTPClientConfig{
- BearerToken: "foo",
- },
- ProtobufMessage: config.RemoteWriteProtoMsgV1,
- }
- c2 := &config.RemoteWriteConfig{
- RemoteTimeout: model.Duration(30 * time.Second),
- QueueConfig: config.DefaultQueueConfig,
- ProtobufMessage: config.RemoteWriteProtoMsgV1,
- }
+ c0 := &config.RemoteWriteConfig{
+ RemoteTimeout: model.Duration(10 * time.Second),
+ QueueConfig: config.DefaultQueueConfig,
+ WriteRelabelConfigs: []*relabel.Config{
+ {
+ Regex: relabel.MustNewRegexp(".+"),
+ },
+ },
+ ProtobufMessage: config.RemoteWriteProtoMsgV1,
+ }
+ if statefulWatcher {
+ c0.Name = "foo"
+ }
+ c1 := &config.RemoteWriteConfig{
+ RemoteTimeout: model.Duration(20 * time.Second),
+ QueueConfig: config.DefaultQueueConfig,
+ HTTPClientConfig: common_config.HTTPClientConfig{
+ BearerToken: "foo",
+ },
+ ProtobufMessage: config.RemoteWriteProtoMsgV1,
+ }
+ if statefulWatcher {
+ c1.Name = "bar"
+ }
+ c2 := &config.RemoteWriteConfig{
+ RemoteTimeout: model.Duration(30 * time.Second),
+ QueueConfig: config.DefaultQueueConfig,
+ ProtobufMessage: config.RemoteWriteProtoMsgV1,
+ }
+ if statefulWatcher {
+ c2.Name = "baz"
+ }
- conf := &config.Config{
- GlobalConfig: config.GlobalConfig{},
- RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
- }
- // We need to set URL's so that metric creation doesn't panic.
- for i := range conf.RemoteWriteConfigs {
- conf.RemoteWriteConfigs[i].URL = &common_config.URL{
- URL: &url.URL{
- Host: "http://test-storage.com",
- },
- }
- }
- require.NoError(t, s.ApplyConfig(conf))
- require.Len(t, s.queues, 3)
+ conf := &config.Config{
+ GlobalConfig: config.GlobalConfig{},
+ RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
+ }
+ // We need to set URL's so that metric creation doesn't panic.
+ for i := range conf.RemoteWriteConfigs {
+ conf.RemoteWriteConfigs[i].URL = &common_config.URL{
+ URL: &url.URL{
+ Host: "http://test-storage.com",
+ },
+ }
+ }
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 3)
- hashes := make([]string, len(conf.RemoteWriteConfigs))
- queues := make([]*QueueManager, len(conf.RemoteWriteConfigs))
- storeHashes := func() {
- for i := range conf.RemoteWriteConfigs {
- hash, err := toHash(conf.RemoteWriteConfigs[i])
+ hashes := make([]string, len(conf.RemoteWriteConfigs))
+ queues := make([]*QueueManager, len(conf.RemoteWriteConfigs))
+ storeHashes := func() {
+ for i := range conf.RemoteWriteConfigs {
+ hash, err := toHash(conf.RemoteWriteConfigs[i])
+ require.NoError(t, err)
+ hashes[i] = hash
+ queues[i] = s.queues[hash]
+ }
+ }
+
+ storeHashes()
+ // Update c0 and c2.
+ c0.WriteRelabelConfigs[0] = &relabel.Config{Regex: relabel.MustNewRegexp("foo")}
+ c2.RemoteTimeout = model.Duration(50 * time.Second)
+ conf = &config.Config{
+ GlobalConfig: config.GlobalConfig{},
+ RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
+ }
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 3)
+
+ _, hashExists := s.queues[hashes[0]]
+ require.False(t, hashExists, "The queue for the first remote write configuration should have been restarted because the relabel configuration has changed.")
+ q, hashExists := s.queues[hashes[1]]
+ require.True(t, hashExists, "Hash of unchanged queue should have remained the same")
+ require.Equal(t, q, queues[1], "Pointer of unchanged queue should have remained the same")
+ _, hashExists = s.queues[hashes[2]]
+ require.False(t, hashExists, "The queue for the third remote write configuration should have been restarted because the timeout has changed.")
+
+ storeHashes()
+ secondClient := s.queues[hashes[1]].client()
+ // Update c1.
+ c1.HTTPClientConfig.BearerToken = "bar"
+ err := s.ApplyConfig(conf)
require.NoError(t, err)
- hashes[i] = hash
- queues[i] = s.queues[hash]
- }
+ require.Len(t, s.queues, 3)
+
+ _, hashExists = s.queues[hashes[0]]
+ require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
+ q, hashExists = s.queues[hashes[1]]
+ require.True(t, hashExists, "Hash of queue with secret change should have remained the same")
+ require.NotEqual(t, secondClient, q.client(), "Pointer of a client with a secret change should not be the same")
+ _, hashExists = s.queues[hashes[2]]
+ require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
+
+ storeHashes()
+ // Delete c0.
+ conf = &config.Config{
+ GlobalConfig: config.GlobalConfig{},
+ RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2},
+ }
+ require.NoError(t, s.ApplyConfig(conf))
+ require.Len(t, s.queues, 2)
+
+ _, hashExists = s.queues[hashes[0]]
+ require.False(t, hashExists, "If a config is removed, the queue should be stopped and recreated.")
+ _, hashExists = s.queues[hashes[1]]
+ require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
+ _, hashExists = s.queues[hashes[2]]
+ require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
+
+ require.NoError(t, s.Close())
+ })
}
-
- storeHashes()
- // Update c0 and c2.
- c0.WriteRelabelConfigs[0] = &relabel.Config{Regex: relabel.MustNewRegexp("foo")}
- c2.RemoteTimeout = model.Duration(50 * time.Second)
- conf = &config.Config{
- GlobalConfig: config.GlobalConfig{},
- RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
- }
- require.NoError(t, s.ApplyConfig(conf))
- require.Len(t, s.queues, 3)
-
- _, hashExists := s.queues[hashes[0]]
- require.False(t, hashExists, "The queue for the first remote write configuration should have been restarted because the relabel configuration has changed.")
- q, hashExists := s.queues[hashes[1]]
- require.True(t, hashExists, "Hash of unchanged queue should have remained the same")
- require.Equal(t, q, queues[1], "Pointer of unchanged queue should have remained the same")
- _, hashExists = s.queues[hashes[2]]
- require.False(t, hashExists, "The queue for the third remote write configuration should have been restarted because the timeout has changed.")
-
- storeHashes()
- secondClient := s.queues[hashes[1]].client()
- // Update c1.
- c1.HTTPClientConfig.BearerToken = "bar"
- err := s.ApplyConfig(conf)
- require.NoError(t, err)
- require.Len(t, s.queues, 3)
-
- _, hashExists = s.queues[hashes[0]]
- require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
- q, hashExists = s.queues[hashes[1]]
- require.True(t, hashExists, "Hash of queue with secret change should have remained the same")
- require.NotEqual(t, secondClient, q.client(), "Pointer of a client with a secret change should not be the same")
- _, hashExists = s.queues[hashes[2]]
- require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
-
- storeHashes()
- // Delete c0.
- conf = &config.Config{
- GlobalConfig: config.GlobalConfig{},
- RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2},
- }
- require.NoError(t, s.ApplyConfig(conf))
- require.Len(t, s.queues, 2)
-
- _, hashExists = s.queues[hashes[0]]
- require.False(t, hashExists, "If a config is removed, the queue should be stopped and recreated.")
- _, hashExists = s.queues[hashes[1]]
- require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
- _, hashExists = s.queues[hashes[2]]
- require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
-
- require.NoError(t, s.Close())
}
func TestOTLPWriteHandler(t *testing.T) {
diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go
index b31041b1b..512d6c971 100644
--- a/tsdb/agent/db_test.go
+++ b/tsdb/agent/db_test.go
@@ -89,7 +89,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
t.Helper()
dbDir := t.TempDir()
- rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
+ rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@@ -585,7 +585,7 @@ func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
- rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
+ rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@@ -605,7 +605,7 @@ func TestLockfile(t *testing.T) {
func Test_ExistingWAL_NextRef(t *testing.T) {
dbDir := t.TempDir()
- rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
+ rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false, false)
defer func() {
require.NoError(t, rs.Close())
}()
diff --git a/tsdb/wlog/stateful_watcher.go b/tsdb/wlog/stateful_watcher.go
new file mode 100644
index 000000000..9e4bc29c9
--- /dev/null
+++ b/tsdb/wlog/stateful_watcher.go
@@ -0,0 +1,850 @@
+// Copyright 2018 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wlog
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/go-kit/log"
+ "github.com/go-kit/log/level"
+
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/tsdb/record"
+
+ "github.com/prometheus/prometheus/tsdb/fileutil"
+)
+
+const (
+ ProgressMarkerDirName string = "remote_write"
+ ProgressMarkerFileExt string = ".json"
+ unknownSegment int = -1
+)
+
+// ProgressMarker add versioning? TODO.
+type ProgressMarker struct {
+ // TODO:
+ // FirstSegment int `json:"firstSegmentIndex"`
+ // LastSegment int `json:"lastSegmentIndex"`
+
+ SegmentIndex int `json:"segmentIndex"`
+
+ // Offset of read data into the segment.
+ // Record at the offset is already processed.
+ Offset int64 `json:"offset"`
+}
+
+func (pm *ProgressMarker) String() string {
+ return fmt.Sprintf("%08d:%d", pm.SegmentIndex, pm.Offset)
+}
+
+func (pm *ProgressMarker) isUnknown() bool {
+ return pm.SegmentIndex == unknownSegment
+}
+
+func (w *StatefulWatcher) progressMarkerFilePath() string {
+ return filepath.Join(w.walDir, ProgressMarkerDirName, w.name+ProgressMarkerFileExt)
+}
+
+// TODO: copied from block.go.
+func (w *StatefulWatcher) readProgressFile() error {
+ b, err := os.ReadFile(w.progressMarkerFilePath())
+ if err != nil {
+ return err
+ }
+ var p ProgressMarker
+
+ if err := json.Unmarshal(b, &p); err != nil {
+ return err
+ }
+
+ w.progressMarker = &p
+ return nil
+}
+
+// TODO: copied from block.go.
+func (w *StatefulWatcher) writeProgressFile() (int64, error) {
+ // Make any changes to the file appear atomic.
+ path := w.progressMarkerFilePath()
+ if err := os.MkdirAll(filepath.Dir(path), 0o777); err != nil {
+ return 0, err
+ }
+ tmp := path + ".tmp"
+ defer func() {
+ if err := os.RemoveAll(tmp); err != nil {
+ level.Error(w.logger).Log("msg", "Remove tmp file", "err", err.Error())
+ }
+ }()
+
+ f, err := os.Create(tmp)
+ if err != nil {
+ return 0, err
+ }
+
+ jsonProgress, err := json.MarshalIndent(w.progressMarker, "", "\t")
+ if err != nil {
+ return 0, err
+ }
+
+ n, err := f.Write(jsonProgress)
+ if err != nil {
+ return 0, errors.Join(err, f.Close())
+ }
+
+ // Force the kernel to persist the file on disk to avoid data loss if the host crashes.
+ // TODO: really need to Sync??
+ if err := f.Sync(); err != nil {
+ return 0, errors.Join(err, f.Close())
+ }
+ if err := f.Close(); err != nil {
+ return 0, err
+ }
+ return int64(n), fileutil.Replace(tmp, path)
+}
+
+func (w *StatefulWatcher) deleteProgressFile() error {
+ return os.Remove(w.progressMarkerFilePath())
+}
+
+type segmentsRange struct {
+ firstSegment int
+ currentSegment int
+ lastSegment int
+}
+
+func (sr *segmentsRange) String() string {
+ return fmt.Sprintf("range: %08d to %08d (current: %08d)", sr.firstSegment, sr.lastSegment, sr.currentSegment)
+}
+
+func (sr *segmentsRange) isUnknown() bool {
+ return sr.firstSegment == unknownSegment || sr.currentSegment == unknownSegment || sr.lastSegment == unknownSegment
+}
+
+// StatefulWatcher watches the TSDB WAL for a given WriteTo.
+// Unlike Watcher, it keeps track of its progress and resumes where it left off.
+type StatefulWatcher struct {
+ *Watcher
+
+ // seriesOnly indicates if the watcher is currently only processing series samples
+ // to build its state of the world, or if it has started forwarding data as well.
+ seriesOnly bool
+ progressMarker *ProgressMarker
+ segmentsState *segmentsRange
+}
+
+// NewStatefulWatcher creates a new stateful WAL watcher for a given WriteTo.
+func NewStatefulWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *StatefulWatcher {
+ if name == "" {
+ panic("name should be set as it's used as an indentifier.")
+ }
+
+ w := NewWatcher(metrics, readerMetrics, logger, name, writer, dir, sendExemplars, sendHistograms, sendMetadata)
+ return &StatefulWatcher{
+ Watcher: w,
+ progressMarker: &ProgressMarker{
+ SegmentIndex: unknownSegment,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: unknownSegment,
+ currentSegment: unknownSegment,
+ lastSegment: unknownSegment,
+ },
+ }
+}
+
+func (w *StatefulWatcher) SetStartTime(t time.Time) {
+ //
+}
+
+// Start the Watcher.
+func (w *StatefulWatcher) Start() {
+ w.setMetrics()
+
+ // Look for previous persisted progress marker.
+ err := w.readProgressFile()
+ switch {
+ case err != nil && os.IsNotExist(err):
+ level.Debug(w.logger).Log(
+ "msg", "previous progress marker was not found",
+ "name", w.name,
+ )
+ case err != nil:
+ level.Warn(w.logger).Log(
+ "msg", "progress marker file could not be read, it will be reset",
+ "file", w.progressMarkerFilePath(),
+ )
+ }
+ // create the corresponding file as soon as possible for better visibility.
+ _, err = w.writeProgressFile()
+ if err != nil {
+ // TODO
+ panic("Return error?")
+ }
+
+ level.Info(w.logger).Log("msg", "Starting WAL watcher")
+ go w.loop()
+}
+
+// Stop the Watcher.
+func (w *StatefulWatcher) Stop() {
+ close(w.quit)
+ <-w.done
+
+ // Persist the progressMarker marker.
+ _, err := w.writeProgressFile()
+ if err != nil {
+ // TODO
+ panic("Return error?")
+ }
+
+ // Records read metric has series and samples.
+ if w.metrics != nil {
+ w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
+ w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
+ w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
+ w.metrics.currentSegment.DeleteLabelValues(w.name)
+ // TODO: add marker progress related metrics, segment and offset
+ }
+
+ level.Info(w.logger).Log("msg", "WAL watcher stopped", "name", w.name)
+}
+
+func (w *StatefulWatcher) PurgeState() {
+ <-w.done
+
+ if err := w.deleteProgressFile(); err != nil {
+ level.Error(w.logger).Log("msg", "Failed to delete the progress marker file", "file", w.progressMarkerFilePath())
+ }
+}
+
+func (w *StatefulWatcher) loop() {
+ defer close(w.done)
+
+ // We may encounter failures processing the WAL; we should wait and retry.
+ // TODO: should we increment the current segment,just skip the corrupted record or just keep failling?
+ for !isClosed(w.quit) {
+ if err := w.Run(); err != nil {
+ level.Error(w.logger).Log("msg", "Error reading WAL", "err", err)
+ }
+
+ select {
+ case <-w.quit:
+ return
+ case <-time.After(5 * time.Second):
+ }
+ }
+}
+
+// syncOnIteration reacts to changes in the WAL segments (segment addition, deletion) to ensure appropriate segments are considered.
+// It ensures that on each iteration:
+// firstSegment <= w.progressMarker.segmentIndex <= lastSegment
+// and
+// firstSegment <= currentSegment <= lastSegment.
+// When currentSegment is adjusted, the appropriate Checkpoint is read if available.
+func (w *StatefulWatcher) syncOnIteration() error {
+ // Look for the last segment.
+ _, lastSegment, err := Segments(w.walDir)
+ if err != nil {
+ return fmt.Errorf("Segments: %w", err)
+ }
+
+ // Look for the first segment after the last checkpoint.
+ lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
+ if err != nil && !errors.Is(err, record.ErrNotFound) {
+ return fmt.Errorf("LastCheckpoint: %w", err)
+ }
+ // Decision is made later whether the Checkpoint needs to be read.
+ checkpointAvailable := err == nil
+ firstSegment, err := w.findSegmentForIndex(checkpointIndex)
+ if err != nil {
+ return err
+ }
+
+ level.Debug(w.logger).Log(
+ "msg", "syncOnIteration",
+ "prevSegmentsState", w.segmentsState,
+ "prevProgressMarker", w.progressMarker,
+ "firstSegment", firstSegment,
+ "lastSegment", lastSegment,
+ "lastCheckpoint", lastCheckpoint,
+ "checkpointIndex", checkpointIndex,
+ )
+
+ // Segments are identified by their names, if nothing changed, return early.
+ if firstSegment == w.segmentsState.firstSegment && lastSegment >= w.segmentsState.lastSegment {
+ w.segmentsState.lastSegment = lastSegment
+ return nil
+ }
+
+ prevProgressMarker := *(w.progressMarker)
+ prevSegmentsState := *(w.segmentsState)
+
+ readCheckpoint := func() error {
+ // If the current segment is changed, reading the Checkpoint is needed.
+ if !checkpointAvailable || w.segmentsState.currentSegment == prevSegmentsState.currentSegment {
+ return nil
+ }
+ if err = w.readCheckpoint(lastCheckpoint, checkpointIndex, (*StatefulWatcher).readRecords); err != nil {
+ return fmt.Errorf("readCheckpoint: %w", err)
+ }
+ w.lastCheckpoint = lastCheckpoint
+ return nil
+ }
+
+ // First iteration.
+ if w.segmentsState.isUnknown() {
+ switch {
+ case w.progressMarker.isUnknown():
+ // start from the last segment if progress marker is unknown.
+ w.setProgress(lastSegment, 0)
+ case prevProgressMarker.SegmentIndex < firstSegment:
+ // M
+ // [* * * *
+ // ^
+ // M'
+ w.setProgress(firstSegment, 0)
+ case prevProgressMarker.SegmentIndex > lastSegment:
+ // M
+ // * * * *]
+ // ^
+ // M'
+ w.setProgress(lastSegment, 0)
+ }
+ w.setSegmentsState(firstSegment, firstSegment, lastSegment)
+
+ return readCheckpoint()
+ }
+
+ // Adjust on the first segment changes.
+ if firstSegment < prevSegmentsState.firstSegment {
+ // [* * * * [* * * *
+ // ^ ^
+ // F F
+ // [* * * * * [* * * * * * * * *
+ // ^ ^
+ // F'=C'=M' F'=C'=M'
+ level.Info(w.logger).Log(
+ "msg", "Segments were added before the previous first segment",
+ "prevSegmentsState", &prevSegmentsState,
+ "firstSegment", firstSegment,
+ )
+ w.setProgress(firstSegment, 0)
+ w.segmentsState.currentSegment = firstSegment
+ }
+ if firstSegment > prevSegmentsState.firstSegment {
+ if firstSegment <= prevProgressMarker.SegmentIndex {
+ // [* * * * * [* * * * *
+ // ^ ^
+ // M M
+ // [* * * * [* * *
+ // ^ ^
+ // M' M'
+ level.Debug(w.logger).Log(
+ "msg", "Segments whose data was read and forwarded were removed",
+ "prevSegmentsState", &prevSegmentsState,
+ "prevProgressMarker", &prevProgressMarker,
+ "firstSegment", firstSegment,
+ )
+ } else {
+ // [* * * * * * * [* * * * *
+ // ^ ^
+ // M M
+ // [* * * * [* * * *
+ // ^ ^
+ // M' M'
+ w.setProgress(firstSegment, 0)
+ level.Warn(w.logger).Log(
+ "msg", "Segments were removed before the watcher ensured all data in them was read and forwarded",
+ "prevSegmentsState", &prevSegmentsState,
+ "prevProgressMarker", &prevProgressMarker,
+ "firstSegment", firstSegment,
+ )
+ }
+
+ if firstSegment <= prevSegmentsState.currentSegment {
+ // [* * * * * [* * * * * *
+ // ^ ^
+ // C C
+ // [* * * * [* * * *
+ // ^ ^
+ // C' C'
+ level.Debug(w.logger).Log(
+ "msg", "Segments that were already replayed were removed",
+ "prevSegmentsState", &prevSegmentsState,
+ "firstSegment", firstSegment,
+ )
+ } else {
+ // [* * * * * * * [* * * * * *
+ // ^ ^
+ // C C
+ // [* * * [* * * * *
+ // ^ ^
+ // C' C'
+ w.segmentsState.currentSegment = firstSegment
+ level.Warn(w.logger).Log(
+ "msg", "Segments were removed before they could be read",
+ "prevSegmentsState", &prevSegmentsState,
+ "firstSegment", firstSegment,
+ )
+ }
+ }
+ if firstSegment > prevSegmentsState.lastSegment {
+ // * * * * *]
+ // ^
+ // L
+ // [* * * *
+ // ^
+ // F'
+ level.Debug(w.logger).Log(
+ "msg", "All segments are subsequent to the previous last segment",
+ "prevSegmentsState", &prevSegmentsState,
+ "firstSegment", firstSegment,
+ )
+ }
+
+ // Adjust on last segment changes.
+ if lastSegment > prevSegmentsState.lastSegment {
+ // * * * * *]
+ // ^
+ // F
+ // * * * * * * *]
+ // ^
+ // F'
+ level.Debug(w.logger).Log(
+ "msg", "New segments were added after the previous last segment",
+ "prevSegmentsState", &prevSegmentsState,
+ "lastSegment", lastSegment,
+ )
+ }
+ if lastSegment < prevSegmentsState.lastSegment {
+ if lastSegment >= prevProgressMarker.SegmentIndex {
+ // * * * * *] * * * * *]
+ // ^ ^
+ // M M
+ // * * * *] * * * *]
+ // ^ ^
+ // M' M'
+ level.Warn(w.logger).Log(
+ "msg", "Segments were removed before the watcher ensured all data in them was read and forwarded",
+ "prevSegmentsState", &prevSegmentsState,
+ "prevProgressMarker", &prevProgressMarker,
+ "lastSegment", lastSegment,
+ )
+ } else {
+ if firstSegment >= prevSegmentsState.firstSegment {
+ // [* * * * * * *]
+ // ^
+ // M
+ // [* * * *]
+ // ^
+ // M'
+ w.setProgress(lastSegment, 0)
+ }
+ level.Debug(w.logger).Log(
+ "msg", "The progress marker points to a segment beyond the last segment",
+ "prevSegmentsState", &prevSegmentsState,
+ "prevProgressMarker", &prevProgressMarker,
+ "lastSegment", lastSegment,
+ )
+ }
+
+ if lastSegment >= prevSegmentsState.currentSegment {
+ // * * * * *] * * * * *]
+ // ^ ^
+ // C C
+ // * * * *] * * * *]
+ // ^ ^
+ // C' C'
+ level.Warn(w.logger).Log(
+ "msg", "Segments were removed before they could be read",
+ "prevSegmentsState", &prevSegmentsState,
+ "lastSegment", lastSegment,
+ )
+ } else {
+ if firstSegment >= prevSegmentsState.firstSegment {
+ // [* * * * * * *]
+ // ^
+ // C
+ // [* * * *]
+ // ^
+ // C'
+ w.segmentsState.currentSegment = lastSegment
+ }
+ level.Debug(w.logger).Log(
+ "msg", "The segment to read is beyond the last segment",
+ "prevSegmentsState", &prevSegmentsState,
+ "prevProgressMarker", &prevProgressMarker,
+ "lastSegment", lastSegment,
+ )
+ }
+ }
+ if lastSegment < prevSegmentsState.firstSegment {
+ // [* * * * *
+ // ^
+ // F
+ // * * * * *]
+ // ^
+ // L'
+ level.Debug(w.logger).Log(
+ "msg", "All segments are prior to the previous first segment",
+ "prevSegmentsState", &prevSegmentsState,
+ "lastSegment", lastSegment,
+ )
+ }
+
+ w.setSegmentsState(firstSegment, w.segmentsState.currentSegment, lastSegment)
+ return readCheckpoint()
+}
+
+// Run the watcher, which will tail the last WAL segment until the quit channel is closed
+// or an error case is hit.
+func (w *StatefulWatcher) Run() error {
+ level.Info(w.logger).Log("msg", "Replaying WAL")
+
+ for !isClosed(w.quit) {
+ // The watcher will decide when to start forwarding data while reading the segment.
+ w.seriesOnly = true
+
+ if err := w.syncOnIteration(); err != nil {
+ return err
+ }
+ w.currentSegmentMetric.Set(float64(w.segmentsState.currentSegment))
+
+ switch {
+ case w.progressMarker.SegmentIndex <= w.segmentsState.currentSegment && w.segmentsState.currentSegment < w.segmentsState.lastSegment:
+ level.Debug(w.logger).Log(
+ "msg", "Reading a segment subsequent or equal to where the watcher left off",
+ "segmentsState", w.segmentsState,
+ "progressMarker", w.progressMarker,
+ )
+ if err := w.readSegmentStrict(w.walDir, w.segmentsState.currentSegment, (*StatefulWatcher).readRecords); err != nil {
+ return err
+ }
+ case w.segmentsState.currentSegment < w.progressMarker.SegmentIndex:
+ level.Debug(w.logger).Log(
+ "msg", "Reading a segment prior to where the watcher left off",
+ "segmentsState", w.segmentsState,
+ "progressMarker", w.progressMarker,
+ )
+ w.readSegmentIgnoreErrors(w.walDir, w.segmentsState.currentSegment, (*StatefulWatcher).readRecords)
+ default:
+ level.Debug(w.logger).Log(
+ "msg", "Reading and tailing the last segment",
+ "segmentsState", w.segmentsState,
+ "progressMarker", w.progressMarker,
+ )
+ if err := w.watch(w.segmentsState.currentSegment); err != nil {
+ return err
+ }
+ }
+
+ // For testing: stop when you hit a specific segment.
+ if w.segmentsState.currentSegment == w.MaxSegment {
+ return nil
+ }
+
+ w.segmentsState.currentSegment++
+ }
+ return nil
+}
+
+func (w *StatefulWatcher) watch(segmentIndex int) error {
+ segment, err := OpenReadSegment(SegmentName(w.walDir, segmentIndex))
+ if err != nil {
+ return err
+ }
+ defer segment.Close()
+
+ reader := NewLiveReader(w.logger, w.readerMetrics, segment)
+ // Don't wait for tickers to process records that are already in the segment.
+ err = w.readRecordsIgnoreEOF(reader, segmentIndex)
+ if err != nil {
+ return err
+ }
+
+ checkpointTicker := time.NewTicker(w.checkpointPeriod)
+ defer checkpointTicker.Stop()
+
+ segmentTicker := time.NewTicker(w.segmentCheckPeriod)
+ defer segmentTicker.Stop()
+
+ readTicker := time.NewTicker(w.readTimeout)
+ defer readTicker.Stop()
+
+ gcSem := make(chan struct{}, 1)
+ for {
+ select {
+ case <-w.quit:
+ return nil
+
+ case <-checkpointTicker.C:
+ // Periodically check if there is a new checkpoint so we can garbage
+ // collect labels. As this is considered an optimisation, we ignore
+ // errors during checkpoint processing. Doing the process asynchronously
+ // allows the current WAL segment to be processed while reading the
+ // checkpoint.
+ select {
+ case gcSem <- struct{}{}:
+ go func() {
+ defer func() {
+ <-gcSem
+ }()
+ if err := w.garbageCollectSeries(segmentIndex); err != nil {
+ level.Warn(w.logger).Log("msg", "Error process checkpoint", "err", err)
+ }
+ }()
+ default:
+ // Currently doing a garbage collection, try again later.
+ }
+
+ // if a newer segment is produced, read the current one until the end and move on.
+ case <-segmentTicker.C:
+ _, last, err := Segments(w.walDir)
+ if err != nil {
+ return fmt.Errorf("Segments: %w", err)
+ }
+
+ if last > segmentIndex {
+ return w.readRecordsIgnoreEOF(reader, segmentIndex)
+ }
+
+ // we haven't read due to a notification in quite some time, try reading anyways
+ case <-readTicker.C:
+ level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", w.readTimeout)
+ err := w.readRecordsIgnoreEOF(reader, segmentIndex)
+ if err != nil {
+ return err
+ }
+ // reset the ticker so we don't read too often
+ readTicker.Reset(w.readTimeout)
+
+ case <-w.readNotify:
+ err := w.readRecordsIgnoreEOF(reader, segmentIndex)
+ if err != nil {
+ return err
+ }
+ // reset the ticker so we don't read too often
+ readTicker.Reset(w.readTimeout)
+ }
+ }
+}
+
+type recordsReadFn func(w *StatefulWatcher, r *LiveReader, segmentIndex int) error
+
+// readSegmentStrict reads the segment and returns an error if the segment was not read completely.
+func (w *StatefulWatcher) readSegmentStrict(segmentDir string, segmentIndex int, readFn recordsReadFn) error {
+ segment, err := OpenReadSegment(SegmentName(segmentDir, segmentIndex))
+ if err != nil {
+ return fmt.Errorf("unable to open segment: %w", err)
+ }
+ defer segment.Close()
+
+ size, err := getSegmentSize(segmentDir, segmentIndex)
+ if err != nil {
+ return fmt.Errorf("getSegmentSize: %w", err)
+ }
+
+ reader := NewLiveReader(w.logger, w.readerMetrics, segment)
+ err = readFn(w, reader, segmentIndex)
+
+ if errors.Is(err, io.EOF) {
+ if reader.Offset() == size {
+ return nil
+ }
+ return fmt.Errorf("readSegmentStrict wasn't able to read all data from the segment %08d, size: %d, totalRead: %d", segmentIndex, size, reader.Offset())
+ }
+ return err
+}
+
+// readSegmentIgnoreErrors reads the segment, it logs and ignores all eventuels errors.
+// TODO: ducument why we need this, https://github.com/prometheus/prometheus/pull/5214/files
+func (w *StatefulWatcher) readSegmentIgnoreErrors(segmentDir string, segmentIndex int, readFn recordsReadFn) {
+ err := w.readSegmentStrict(segmentDir, segmentIndex, readFn)
+ if err != nil {
+ level.Warn(w.logger).Log("msg", "Ignoring error reading the segment, may have dropped data", "segment", segmentIndex, "err", err)
+ }
+}
+
+func (w *StatefulWatcher) checkProgressMarkerReached(segmentIndex int, offset int64) {
+ if !w.seriesOnly {
+ return
+ }
+ // Check if we arrived back where we left off.
+ // offset > w.progressMarker.Offset as the segment may have been altered and replaced, which is not detected nor supported.
+ if segmentIndex > w.progressMarker.SegmentIndex || (segmentIndex == w.progressMarker.SegmentIndex && offset > w.progressMarker.Offset) {
+ // make the watcher start forwarding data.
+ w.seriesOnly = false
+ }
+}
+
+func (w *StatefulWatcher) setSegmentsState(firstSegment, currentSegment, lastSegment int) {
+ w.segmentsState.firstSegment = firstSegment
+ w.segmentsState.currentSegment = currentSegment
+ w.segmentsState.lastSegment = lastSegment
+}
+
+func (w *StatefulWatcher) setProgress(segmentIndex int, offset int64) {
+ w.progressMarker.SegmentIndex = segmentIndex
+ w.progressMarker.Offset = offset
+}
+
+func (w *StatefulWatcher) UpdateProgress(segmentIndex int, offset int64) {
+ // The watcher isn't forwarding data yet.
+ if w.seriesOnly {
+ return
+ }
+ w.setProgress(segmentIndex, offset)
+}
+
+// Read records and pass the details to w.writer.
+// Also used with readCheckpoint - implements recordsReadFn.
+func (w *StatefulWatcher) readRecords(r *LiveReader, segmentIndex int) error {
+ var (
+ dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
+ series []record.RefSeries
+ samples []record.RefSample
+ exemplars []record.RefExemplar
+ histograms []record.RefHistogramSample
+ floatHistograms []record.RefFloatHistogramSample
+ metadata []record.RefMetadata
+ )
+
+ for r.Next() && !isClosed(w.quit) {
+ w.checkProgressMarkerReached(segmentIndex, r.Offset())
+ rec := r.Record()
+ w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
+
+ switch dec.Type(rec) {
+ case record.Series:
+ series, err := dec.Series(rec, series[:0])
+ if err != nil {
+ w.recordDecodeFailsMetric.Inc()
+ return err
+ }
+ w.writer.StoreSeries(series, segmentIndex)
+
+ case record.Samples:
+ if w.seriesOnly {
+ break
+ }
+ samples, err := dec.Samples(rec, samples[:0])
+ if err != nil {
+ w.recordDecodeFailsMetric.Inc()
+ return err
+ }
+ if len(samples) > 0 {
+ w.writer.Append(samples)
+ }
+
+ case record.Exemplars:
+ // Skip if experimental "exemplars over remote write" is not enabled.
+ if !w.sendExemplars || w.seriesOnly {
+ break
+ }
+
+ exemplars, err := dec.Exemplars(rec, exemplars[:0])
+ if err != nil {
+ w.recordDecodeFailsMetric.Inc()
+ return err
+ }
+ if len(exemplars) > 0 {
+ w.writer.AppendExemplars(exemplars)
+ }
+ case record.HistogramSamples:
+ // Skip if experimental "histograms over remote write" is not enabled.
+ if !w.sendHistograms || w.seriesOnly {
+ break
+ }
+ histograms, err := dec.HistogramSamples(rec, histograms[:0])
+ if err != nil {
+ w.recordDecodeFailsMetric.Inc()
+ return err
+ }
+ if len(histograms) > 0 {
+ w.writer.AppendHistograms(histograms)
+ }
+ case record.FloatHistogramSamples:
+ // Skip if experimental "histograms over remote write" is not enabled.
+ if !w.sendHistograms || w.seriesOnly {
+ break
+ }
+ floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
+ if err != nil {
+ w.recordDecodeFailsMetric.Inc()
+ return err
+ }
+ if len(floatHistograms) > 0 {
+ w.writer.AppendFloatHistograms(floatHistograms)
+ }
+ case record.Metadata:
+ if !w.sendMetadata {
+ break
+ }
+ meta, err := dec.Metadata(rec, metadata[:0])
+ if err != nil {
+ w.recordDecodeFailsMetric.Inc()
+ return err
+ }
+ w.writer.StoreMetadata(meta)
+ case record.Tombstones:
+
+ default:
+ // Could be corruption, or reading from a WAL from a newer Prometheus.
+ w.recordDecodeFailsMetric.Inc()
+ }
+
+ // TODO: check AppendXXX returned values.
+ // In the future, the shards will influence this.
+ w.UpdateProgress(segmentIndex, r.Offset())
+ }
+ if err := r.Err(); err != nil {
+ return fmt.Errorf("readRecods from segment %08d: %w", segmentIndex, err)
+ }
+ return nil
+}
+
+func (w *StatefulWatcher) readRecordsIgnoreEOF(r *LiveReader, segmentIndex int) error {
+ if err := w.readRecords(r, segmentIndex); err != nil && !errors.Is(err, io.EOF) {
+ return err
+ }
+ return nil
+}
+
+// Read all the segments from a Checkpoint directory.
+func (w *StatefulWatcher) readCheckpoint(checkpointDir string, checkpointIndex int, readFn recordsReadFn) error {
+ level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
+ // Records from these segments should be marked as originating from the Checkpoint index itself.
+ indexAwareReadFn := func(w *StatefulWatcher, r *LiveReader, _ int) error {
+ return readFn(w, r, checkpointIndex)
+ }
+ // Read all segments in the checkpoint dir.
+ segs, err := listSegments(checkpointDir)
+ if err != nil {
+ return fmt.Errorf("Unable to get segments in checkpoint dir: %w", err)
+ }
+ for _, segRef := range segs {
+ err := w.readSegmentStrict(checkpointDir, segRef.index, indexAwareReadFn)
+ if err != nil {
+ return fmt.Errorf("readSegmentStrict from the checkpoint %s: %w", checkpointDir, err)
+ }
+ }
+
+ level.Debug(w.logger).Log("msg", "Read series references from checkpoint", "checkpoint", checkpointDir)
+ return nil
+}
diff --git a/tsdb/wlog/stateful_watcher_test.go b/tsdb/wlog/stateful_watcher_test.go
new file mode 100644
index 000000000..209fc233d
--- /dev/null
+++ b/tsdb/wlog/stateful_watcher_test.go
@@ -0,0 +1,1450 @@
+// Copyright 2024 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package wlog
+
+import (
+ "fmt"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "runtime"
+ "slices"
+ "testing"
+ "time"
+
+ "github.com/go-kit/log"
+ "github.com/stretchr/testify/require"
+
+ "github.com/prometheus/prometheus/model/histogram"
+ "github.com/prometheus/prometheus/model/labels"
+ "github.com/prometheus/prometheus/tsdb/chunks"
+ "github.com/prometheus/prometheus/tsdb/record"
+)
+
+// The following tests are copied and adapted from watcher_test.go
+
+func TestTailSamples_Stateful(t *testing.T) {
+ t.Parallel()
+
+ pageSize := 32 * 1024
+ const seriesCount = 10
+ const samplesCount = 250
+ const exemplarsCount = 25
+ const histogramsCount = 50
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
+ now := time.Now()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ enc := record.Encoder{}
+ w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+
+ // Write to the initial segment then checkpoint.
+ for i := 0; i < seriesCount; i++ {
+ ref := i + 100
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(ref),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+
+ for j := 0; j < samplesCount; j++ {
+ inner := rand.Intn(ref + 1)
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: now.UnixNano() + 1,
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+
+ for j := 0; j < exemplarsCount; j++ {
+ inner := rand.Intn(ref + 1)
+ exemplar := enc.Exemplars([]record.RefExemplar{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: now.UnixNano() + 1,
+ V: float64(i),
+ Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", inner)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(exemplar))
+ }
+
+ for j := 0; j < histogramsCount; j++ {
+ inner := rand.Intn(ref + 1)
+ hist := &histogram.Histogram{
+ Schema: 2,
+ ZeroThreshold: 1e-128,
+ ZeroCount: 0,
+ Count: 2,
+ Sum: 0,
+ PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
+ PositiveBuckets: []int64{int64(i) + 1},
+ NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
+ NegativeBuckets: []int64{int64(-i) - 1},
+ }
+
+ histogram := enc.HistogramSamples([]record.RefHistogramSample{{
+ Ref: chunks.HeadSeriesRef(inner),
+ T: now.UnixNano() + 1,
+ H: hist,
+ }}, nil)
+ require.NoError(t, w.Log(histogram))
+
+ floatHistogram := enc.FloatHistogramSamples([]record.RefFloatHistogramSample{{
+ Ref: chunks.HeadSeriesRef(inner),
+ T: now.UnixNano() + 1,
+ FH: hist.ToFloat(nil),
+ }}, nil)
+ require.NoError(t, w.Log(floatHistogram))
+ }
+ }
+
+ // Start read after checkpoint, no more data written.
+ first, last, err := Segments(w.Dir())
+ require.NoError(t, err)
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true)
+ watcher.SetStartTime(now)
+
+ // Set the Watcher's metrics so they're not nil pointers.
+ watcher.setMetrics()
+ for i := first; i <= last; i++ {
+ watcher.readSegmentStrict(wdir, i, (*StatefulWatcher).readRecords)
+ }
+
+ expectedSeries := seriesCount
+ expectedSamples := seriesCount * samplesCount
+ expectedExemplars := seriesCount * exemplarsCount
+ expectedHistograms := seriesCount * histogramsCount
+ retry(t, defaultRetryInterval, defaultRetries, func() bool {
+ return wt.seriesCount() >= expectedSeries
+ })
+ require.Equal(t, expectedSeries, wt.seriesCount(), "did not receive the expected number of series")
+ require.Equal(t, expectedSamples, wt.samplesCount(), "did not receive the expected number of samples")
+ require.Equal(t, expectedExemplars, wt.examplarsCount(), "did not receive the expected number of exemplars")
+ require.Equal(t, expectedHistograms, wt.histogramsCount(), "did not receive the expected number of histograms")
+ require.Equal(t, expectedHistograms, wt.floatHistogramsCount(), "did not receive the expected number of float histograms")
+ })
+ }
+}
+
+func TestReadToEndNoCheckpoint_Stateful(t *testing.T) {
+ t.Parallel()
+
+ pageSize := 32 * 1024
+ const seriesCount = 10
+ const samplesCount = 250
+
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+
+ var recs [][]byte
+
+ enc := record.Encoder{}
+
+ for i := 0; i < seriesCount; i++ {
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(i),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ recs = append(recs, series)
+ for j := 0; j < samplesCount; j++ {
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(j),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+
+ recs = append(recs, sample)
+
+ // Randomly batch up records.
+ if rand.Intn(4) < 3 {
+ require.NoError(t, w.Log(recs...))
+ recs = recs[:0]
+ }
+ }
+ }
+ require.NoError(t, w.Log(recs...))
+ _, _, err = Segments(w.Dir())
+ require.NoError(t, err)
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "test", wt, dir, false, false, false)
+ watcher.readTimeout = time.Second
+ go watcher.Start()
+
+ expected := seriesCount
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == expected
+ }, 20*time.Second, 1*time.Second)
+ watcher.Stop()
+ })
+ }
+}
+
+func TestReadToEndWithCheckpoint_Stateful(t *testing.T) {
+ t.Parallel()
+
+ segmentSize := 32 * 1024
+ // We need something similar to this # of series and samples
+ // in order to get enough segments for us to checkpoint.
+ const seriesCount = 10
+ const samplesCount = 250
+
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ enc := record.Encoder{}
+ w, err := NewSize(nil, nil, wdir, segmentSize, compress)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+
+ // Write to the initial segment then checkpoint.
+ for i := 0; i < seriesCount; i++ {
+ ref := i + 100
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(ref),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+ // Add in an unknown record type, which should be ignored.
+ require.NoError(t, w.Log([]byte{255}))
+
+ for j := 0; j < samplesCount; j++ {
+ inner := rand.Intn(ref + 1)
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+ }
+
+ Checkpoint(log.NewNopLogger(), w, 0, 1, func(x chunks.HeadSeriesRef) bool { return true }, 0)
+ w.Truncate(1)
+
+ // Write more records after checkpointing.
+ for i := 0; i < seriesCount; i++ {
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(i),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+
+ for j := 0; j < samplesCount; j++ {
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(j),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+ }
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "bar", wt, dir, false, false, false)
+ watcher.readTimeout = time.Second
+ go watcher.Start()
+
+ expected := seriesCount * 2
+
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == expected
+ }, 10*time.Second, 1*time.Second)
+ watcher.Stop()
+ })
+ }
+}
+
+func TestReadCheckpoint_Stateful(t *testing.T) {
+ t.Parallel()
+
+ pageSize := 32 * 1024
+ const seriesCount = 10
+ const samplesCount = 250
+
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ f, err := os.Create(SegmentName(wdir, 30))
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ enc := record.Encoder{}
+ w, err := NewSize(nil, nil, wdir, 128*pageSize, compress)
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ require.NoError(t, w.Close())
+ })
+
+ // Write to the initial segment then checkpoint.
+ for i := 0; i < seriesCount; i++ {
+ ref := i + 100
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(ref),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+
+ for j := 0; j < samplesCount; j++ {
+ inner := rand.Intn(ref + 1)
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+ }
+ _, err = w.NextSegmentSync()
+ require.NoError(t, err)
+ _, err = Checkpoint(log.NewNopLogger(), w, 30, 31, func(x chunks.HeadSeriesRef) bool { return true }, 0)
+ require.NoError(t, err)
+ require.NoError(t, w.Truncate(32))
+
+ // Start read after checkpoint, no more data written.
+ _, _, err = Segments(w.Dir())
+ require.NoError(t, err)
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "test", wt, dir, false, false, false)
+ go watcher.Start()
+
+ expectedSeries := seriesCount
+ retry(t, defaultRetryInterval, defaultRetries, func() bool {
+ return wt.seriesCount() >= expectedSeries
+ })
+ watcher.Stop()
+ require.Equal(t, expectedSeries, wt.seriesCount())
+ })
+ }
+}
+
+func TestReadCheckpointMultipleSegments_Stateful(t *testing.T) {
+ t.Parallel()
+
+ pageSize := 32 * 1024
+
+ const segments = 1
+ const seriesCount = 20
+ const samplesCount = 300
+
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ enc := record.Encoder{}
+ w, err := NewSize(nil, nil, wdir, pageSize, compress)
+ require.NoError(t, err)
+
+ // Write a bunch of data.
+ for i := 0; i < segments; i++ {
+ for j := 0; j < seriesCount; j++ {
+ ref := j + (i * 100)
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(ref),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+
+ for k := 0; k < samplesCount; k++ {
+ inner := rand.Intn(ref + 1)
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+ }
+ }
+ require.NoError(t, w.Close())
+
+ // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5.
+ checkpointDir := dir + "/wal/checkpoint.000004"
+ err = os.Mkdir(checkpointDir, 0o777)
+ require.NoError(t, err)
+ for i := 0; i <= 4; i++ {
+ err := os.Rename(SegmentName(dir+"/wal", i), SegmentName(checkpointDir, i))
+ require.NoError(t, err)
+ }
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "test", wt, dir, false, false, false)
+ watcher.MaxSegment = -1
+
+ // Set the Watcher's metrics so they're not nil pointers.
+ watcher.setMetrics()
+
+ lastCheckpoint, checkpointIndex, err := LastCheckpoint(watcher.walDir)
+ require.NoError(t, err)
+
+ err = watcher.readCheckpoint(lastCheckpoint, checkpointIndex, (*StatefulWatcher).readRecords)
+ require.NoError(t, err)
+ })
+ }
+}
+
+func TestCheckpointSeriesReset_Stateful(t *testing.T) {
+ t.Parallel()
+
+ segmentSize := 32 * 1024
+ // We need something similar to this # of series and samples
+ // in order to get enough segments for us to checkpoint.
+ const seriesCount = 20
+ const samplesCount = 350
+ testCases := []struct {
+ compress CompressionType
+ segments int
+ }{
+ {compress: CompressionNone, segments: 14},
+ {compress: CompressionSnappy, segments: 13},
+ }
+
+ for _, tc := range testCases {
+ t.Run(fmt.Sprintf("compress=%s", tc.compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ enc := record.Encoder{}
+ w, err := NewSize(nil, nil, wdir, segmentSize, tc.compress)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+
+ // Write to the initial segment, then checkpoint later.
+ for i := 0; i < seriesCount; i++ {
+ ref := i + 100
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(ref),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+
+ for j := 0; j < samplesCount; j++ {
+ inner := rand.Intn(ref + 1)
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+ }
+
+ _, _, err = Segments(w.Dir())
+ require.NoError(t, err)
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "foo", wt, dir, false, false, false)
+ watcher.readTimeout = time.Second
+ watcher.MaxSegment = -1
+ go watcher.Start()
+
+ expected := seriesCount
+ retry(t, defaultRetryInterval, defaultRetries, func() bool {
+ return wt.seriesCount() >= expected
+ })
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == seriesCount
+ }, 10*time.Second, 1*time.Second)
+
+ _, err = Checkpoint(log.NewNopLogger(), w, 2, 4, func(x chunks.HeadSeriesRef) bool { return true }, 0)
+ require.NoError(t, err)
+
+ err = w.Truncate(5)
+ require.NoError(t, err)
+
+ _, cpi, err := LastCheckpoint(path.Join(dir, "wal"))
+ require.NoError(t, err)
+ err = watcher.garbageCollectSeries(cpi + 1)
+ require.NoError(t, err)
+
+ watcher.Stop()
+ // If you modify the checkpoint and truncate segment #'s run the test to see how
+ // many series records you end up with and change the last Equals check accordingly
+ // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == tc.segments
+ }, 20*time.Second, 1*time.Second)
+ })
+ }
+}
+
+func TestRun_StartupTime_Stateful(t *testing.T) {
+ t.Parallel()
+
+ const pageSize = 32 * 1024
+ const segments = 10
+ const seriesCount = 20
+ const samplesCount = 300
+
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(string(compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ enc := record.Encoder{}
+ w, err := NewSize(nil, nil, wdir, pageSize, compress)
+ require.NoError(t, err)
+
+ for i := 0; i < segments; i++ {
+ for j := 0; j < seriesCount; j++ {
+ ref := j + (i * 100)
+ series := enc.Series([]record.RefSeries{
+ {
+ Ref: chunks.HeadSeriesRef(ref),
+ Labels: labels.FromStrings("__name__", fmt.Sprintf("metric_%d", i)),
+ },
+ }, nil)
+ require.NoError(t, w.Log(series))
+
+ for k := 0; k < samplesCount; k++ {
+ inner := rand.Intn(ref + 1)
+ sample := enc.Samples([]record.RefSample{
+ {
+ Ref: chunks.HeadSeriesRef(inner),
+ T: int64(i),
+ V: float64(i),
+ },
+ }, nil)
+ require.NoError(t, w.Log(sample))
+ }
+ }
+ }
+ require.NoError(t, w.Close())
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "bar", wt, dir, false, false, false)
+ watcher.MaxSegment = segments
+ watcher.setMetrics()
+
+ startTime := time.Now()
+ err = watcher.Run()
+ require.Less(t, time.Since(startTime), watcher.readTimeout)
+ require.NoError(t, err)
+ })
+ }
+}
+
+func TestRun_AvoidNotifyWhenBehind_Stateful(t *testing.T) {
+ t.Parallel()
+
+ if runtime.GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms.
+ t.SkipNow()
+ }
+ const segmentsCount = 100
+ const seriesCount = 1
+ const samplesCount = 1
+
+ for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
+ t.Run(string(compress), func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ wt := newWriteToMock(time.Millisecond)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "test", wt, dir, false, false, false)
+ watcher.readTimeout = 15 * time.Second
+ // The period is chosen in such a way as to not slow down the tests, but also to ensure
+ // the duration comparison make sense.
+ watcher.segmentCheckPeriod = 100 * time.Millisecond
+ watcher.setMetrics()
+
+ w, err := NewSize(nil, nil, wdir, pageSize, CompressionNone)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+ // Write to the first segment.
+ require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
+
+ // Start the watcher and ensure the first segment is read and tailed.
+ go watcher.Start()
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == seriesCount && wt.samplesCount() == seriesCount*samplesCount
+ }, 3*time.Second, 100*time.Millisecond)
+
+ // Add the rest of the segments.
+ // Start measuring the watcher reaction duration, we suppose segments generation should have little influence on that.
+ startTime := time.Now()
+ for i := 1; i < segmentsCount; i++ {
+ w.NextSegment()
+ require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
+ }
+
+ // If the watcher was to wait for readTimeout or segmentCheckPeriod to read every new segment, it would need min * segmentsCount.
+ retry(t, defaultRetryInterval, defaultRetries, func() bool {
+ return wt.seriesCount() == segmentsCount*seriesCount && wt.samplesCount() == seriesCount*samplesCount*segmentsCount
+ })
+ watcher.Stop()
+
+ require.Less(t, time.Since(startTime), min(watcher.readTimeout, watcher.segmentCheckPeriod)*(segmentsCount/2))
+ })
+ }
+}
+
+// The following tests are specific to the stateful watcher.
+// TODO: should they run on all WAL compress methods?
+
+// This should only be called once as it may mess with existing WAL segments.
+func generateSegments(t *testing.T, wdir string, wantedSegments []int, seriesCount, samplesCount int) *WL {
+ t.Helper()
+
+ if len(wantedSegments) == 0 {
+ return nil
+ }
+
+ w, err := NewSize(nil, nil, wdir, pageSize, CompressionNone)
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ require.NoError(t, w.Close())
+ })
+ // Generate the segments.
+ lastSegment := slices.Max(wantedSegments)
+ require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
+ for j := 1; j <= lastSegment; j++ {
+ w.NextSegment()
+ require.NoError(t, generateWALRecords(w, j, seriesCount, samplesCount))
+ }
+ // Delete the unwanted ones.
+ segs, err := listSegments(wdir)
+ require.NoError(t, err)
+ for _, seg := range segs {
+ if !slices.Contains(wantedSegments, seg.index) {
+ require.NoError(t, os.Remove(SegmentName(wdir, seg.index)))
+ }
+ }
+ // Sanity check
+ var existing []int
+ segs, err = listSegments(wdir)
+ require.NoError(t, err)
+ for _, seg := range segs {
+ existing = append(existing, seg.index)
+ }
+ require.ElementsMatch(t, wantedSegments, existing)
+ return w
+}
+
+// TestStateful_ProgressMarkerIsPersisted ensures that the right progress marker is persisted
+// when the watcher is stopped.
+func TestStateful_ProgressMarkerIsPersisted(t *testing.T) {
+ t.Parallel()
+
+ const seriesCount = 10
+ const samplesCount = 50
+
+ dir := t.TempDir()
+
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ watcherName := "bar"
+ progressFilePath := path.Join(wdir, "remote_write", fmt.Sprintf("%s.json", watcherName))
+ require.NoFileExists(t, progressFilePath)
+
+ w, err := NewSize(nil, nil, wdir, pageSize, CompressionNone)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+ require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
+
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, watcherName, wt, dir, false, false, false)
+ watcher.setMetrics()
+ go watcher.Start()
+ require.Eventually(t, func() bool {
+ return wt.samplesCount() == seriesCount*samplesCount
+ }, 5*time.Second, 100*time.Millisecond)
+ watcher.Stop()
+
+ progressMarker, err := os.ReadFile(progressFilePath)
+ require.NoError(t, err)
+ // Adjust offset if generateWALRecords changes.
+ require.JSONEq(t, `{"segmentIndex": 0,"offset": 17350}`, string(progressMarker))
+}
+
+func createAndWriteTo(t *testing.T, path, content string) {
+ os.MkdirAll(filepath.Dir(path), 0o700)
+ progressMarkerFile, err := os.Create(path)
+ require.NoError(t, err)
+ _, err = progressMarkerFile.WriteString(content)
+ require.NoError(t, err)
+ require.NoError(t, progressMarkerFile.Close())
+}
+
+func TestStateful_SamplesToReadOnStartup(t *testing.T) {
+ t.Parallel()
+
+ const seriesCount = 10
+ const samplesCount = 50
+
+ testCases := []struct {
+ name string
+ segments []int
+ checkpointFromTo []int
+ progressMarker string
+
+ expectedSamples int
+ expectedSeries int
+ expectedProgressMarker string
+ }{
+ {
+ name: "no progress marker",
+ segments: []int{0, 1, 2},
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 2,"offset": 17350}`,
+ },
+ // TODO: adjust, how to know samples to be readd?????
+ /* {
+ name: "with progress marker",
+ segments: []int{10, 11, 12},
+ progressMarker: `{"segmentIndex": 10,"offset": 320}`,
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 2,"offset": 17350}`,
+ }, */
+ {
+ name: "progress marker segment > last segment",
+ segments: []int{0, 1, 2},
+ progressMarker: `{"segmentIndex": 5,"offset": 320}`,
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 2,"offset": 17350}`,
+ },
+ {
+ name: "progress marker segment > last segment with checkpoint",
+ segments: []int{3, 4, 5, 6},
+ checkpointFromTo: []int{3, 5},
+ progressMarker: `{"segmentIndex": 7,"offset": 436}`,
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 4 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 6,"offset": 17350}`,
+ },
+ {
+ name: "progress marker segment < first segment",
+ segments: []int{2, 3, 4},
+ progressMarker: `{"segmentIndex": 0,"offset": 777}`,
+
+ expectedSamples: 3 * seriesCount * samplesCount,
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 4,"offset": 17350}`,
+ },
+ {
+ name: "progress marker segment < first segment with checkpoint",
+ segments: []int{6, 7, 8, 9},
+ checkpointFromTo: []int{6, 8},
+ progressMarker: `{"segmentIndex": 2,"offset": 4356}`,
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 4 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 9,"offset": 17350}`,
+ },
+ {
+ // This simulates a scenario where the last segment was replaced by another one containing less data.
+ // In such a case, the watcher will still use its previous marker.
+ name: "last segment replaced",
+ segments: []int{0, 1, 2},
+ progressMarker: `{"segmentIndex": 2,"offset": 9999999999}`,
+
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 2,"offset": 9999999999}`,
+ },
+ {
+ name: "corrupted progress marker",
+ segments: []int{0, 1, 2},
+ progressMarker: `{"segmentInde`,
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 2,"offset": 17350}`,
+ },
+ {
+ name: "checkpoint prior to first segment",
+ segments: []int{0, 1, 2},
+ checkpointFromTo: []int{0, 1},
+
+ expectedSamples: seriesCount * samplesCount,
+ expectedSeries: 3 * seriesCount,
+ expectedProgressMarker: `{"segmentIndex": 2,"offset": 17350}`,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ w := generateSegments(t, wdir, tc.segments, seriesCount, samplesCount)
+ if tc.checkpointFromTo != nil {
+ Checkpoint(log.NewNopLogger(), w, tc.checkpointFromTo[0], tc.checkpointFromTo[1], func(x chunks.HeadSeriesRef) bool { return true }, 0)
+ w.Truncate(tc.checkpointFromTo[1] + 1)
+ }
+
+ watcherName := "bar"
+ progressMarkerPath := path.Join(wdir, "remote_write", fmt.Sprintf("%s.json", watcherName))
+ if tc.progressMarker != "" {
+ createAndWriteTo(t, progressMarkerPath, tc.progressMarker)
+ }
+
+ // Start the watcher and make sure the appropriate data was forwarded.
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, watcherName, wt, dir, false, false, false)
+ watcher.setMetrics()
+ go watcher.Start()
+
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == tc.expectedSeries && wt.samplesCount() == tc.expectedSamples
+ }, 5*time.Second, 100*time.Millisecond)
+ watcher.Stop()
+
+ // Check the final progress.
+ b, err := os.ReadFile(progressMarkerPath)
+ require.NoError(t, err)
+ require.JSONEq(t, tc.expectedProgressMarker, string(b))
+ })
+ }
+}
+
+func TestStateful_syncOnIteration(t *testing.T) {
+ t.Parallel()
+
+ const seriesCount = 10
+ const samplesCount = 50
+
+ testCases := []struct {
+ name string
+ progressMarker *ProgressMarker
+ segmentsState *segmentsRange
+ segments []int
+
+ expectedToFail bool
+ expectedProgressMarker *ProgressMarker
+ expectedSegmentsState *segmentsRange
+ }{
+ // First iteration
+ {
+ name: "first iteration without progress marker without segments",
+ expectedToFail: true,
+ },
+ {
+ name: "first iteration without progress marker with one segment=00000000",
+ segments: []int{0},
+ expectedProgressMarker: &ProgressMarker{},
+ expectedSegmentsState: &segmentsRange{},
+ },
+ {
+ name: "first iteration without progress marker with one segment>00000000",
+ segments: []int{3},
+ expectedProgressMarker: &ProgressMarker{SegmentIndex: 3},
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 3,
+ currentSegment: 3,
+ lastSegment: 3,
+ },
+ },
+ {
+ name: "first iteration without progress marker with many segments>=00000000",
+ segments: []int{0, 1, 2},
+ expectedProgressMarker: &ProgressMarker{SegmentIndex: 2},
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 0,
+ currentSegment: 0,
+ lastSegment: 2,
+ },
+ },
+ {
+ name: "first iteration without progress marker with many segments>00000000",
+ segments: []int{7, 8, 9},
+ expectedProgressMarker: &ProgressMarker{SegmentIndex: 9},
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 7,
+ currentSegment: 7,
+ lastSegment: 9,
+ },
+ },
+ {
+ name: "first iteration with progress marker without segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 3,
+ Offset: 123,
+ },
+ expectedToFail: true,
+ },
+ {
+ name: "first iteration with progress marker at segment=00000000",
+ progressMarker: &ProgressMarker{
+ Offset: 123,
+ },
+ segments: []int{0},
+ expectedProgressMarker: &ProgressMarker{Offset: 123},
+ expectedSegmentsState: &segmentsRange{},
+ },
+ {
+ name: "first iteration with progress marker > segment=00000000",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 10,
+ Offset: 987,
+ },
+ segments: []int{0},
+ expectedProgressMarker: &ProgressMarker{},
+ expectedSegmentsState: &segmentsRange{},
+ },
+ {
+ name: "first iteration with progress market at segment>00000000",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 4,
+ Offset: 123,
+ },
+ segments: []int{4},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 4,
+ Offset: 123,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 4,
+ currentSegment: 4,
+ lastSegment: 4,
+ },
+ },
+ {
+ name: "first iteration with progress marker at segment-1",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 3,
+ Offset: 123,
+ },
+ segments: []int{4},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 4,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 4,
+ currentSegment: 4,
+ lastSegment: 4,
+ },
+ },
+ {
+ name: "first iteration with progress marker < segment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 2,
+ Offset: 333,
+ },
+ segments: []int{4},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 4,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 4,
+ currentSegment: 4,
+ lastSegment: 4,
+ },
+ },
+ {
+ name: "first iteration with progress marker at segment+1",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 123,
+ },
+ segments: []int{7},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 7,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 7,
+ currentSegment: 7,
+ lastSegment: 7,
+ },
+ },
+ {
+ name: "first iteration with progress marker > segment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 123,
+ },
+ segments: []int{4},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 4,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 4,
+ currentSegment: 4,
+ lastSegment: 4,
+ },
+ },
+ {
+ name: "first iteration with progress marker < many segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 2,
+ Offset: 123,
+ },
+ segments: []int{5, 6},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 5,
+ lastSegment: 6,
+ },
+ },
+ {
+ name: "first iteration with progress marker <= many segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ Offset: 123,
+ },
+ segments: []int{5, 6},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ Offset: 123,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 5,
+ lastSegment: 6,
+ },
+ },
+ {
+ name: "first iteration with progress marker between segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ Offset: 123,
+ },
+ segments: []int{4, 5, 6},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ Offset: 123,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 4,
+ currentSegment: 4,
+ lastSegment: 6,
+ },
+ },
+ {
+ name: "first iteration with progress marker > many segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 123,
+ },
+ segments: []int{1, 2, 3},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 3,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 1,
+ currentSegment: 1,
+ lastSegment: 3,
+ },
+ },
+ {
+ name: "first iteration with progress marker >= many segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 123,
+ },
+ segments: []int{6, 7, 8},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 123,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 6,
+ currentSegment: 6,
+ lastSegment: 8,
+ },
+ },
+ // Intermediate iteration
+ {
+ name: "intermediate iteration without segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 2,
+ Offset: 123,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 1,
+ currentSegment: 3,
+ lastSegment: 3,
+ },
+ expectedToFail: true,
+ },
+ {
+ name: "intermediate iteration same segments",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 6,
+ Offset: 951,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 7,
+ lastSegment: 7,
+ },
+ segments: []int{5, 6, 7},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 6,
+ Offset: 951,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 7,
+ lastSegment: 7,
+ },
+ },
+ {
+ name: "intermediate iteration firstSegment=prevFirstSegment and lastSegment>=prevLastSegment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 7,
+ Offset: 123,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 6,
+ lastSegment: 7,
+ },
+ segments: []int{5, 6, 7, 8},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 7,
+ Offset: 123,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 6,
+ lastSegment: 8,
+ },
+ },
+ {
+ name: "intermediate iteration with firstSegment= firstSegment > prevFirstSegment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 963,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 4,
+ currentSegment: 5,
+ lastSegment: 9,
+ },
+ segments: []int{7, 8, 9},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 963,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 7,
+ currentSegment: 7,
+ lastSegment: 9,
+ },
+ },
+ {
+ name: "intermediate iteration with progress marker segment = firstSegment > prevFirstSegment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 558,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 6,
+ currentSegment: 6,
+ lastSegment: 9,
+ },
+ segments: []int{8, 9},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 8,
+ Offset: 558,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 8,
+ currentSegment: 8,
+ lastSegment: 9,
+ },
+ },
+ {
+ name: "intermediate iteration with firstSegment > progress marker segment >= prevFirstSegment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 3,
+ Offset: 558,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 0,
+ currentSegment: 9,
+ lastSegment: 10,
+ },
+ segments: []int{5, 6, 7},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 7,
+ lastSegment: 7,
+ },
+ },
+ {
+ name: "intermediate iteration with lastSegment < progress marker segment <= prevLastSegment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 7,
+ Offset: 558,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 7,
+ lastSegment: 8,
+ },
+ segments: []int{5, 6},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 6,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 6,
+ lastSegment: 6,
+ },
+ },
+ {
+ name: "intermediate iteration with progress marker segment <= lastSegment < prevLastSegment",
+ progressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ Offset: 558,
+ },
+ segmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 6,
+ lastSegment: 8,
+ },
+ segments: []int{5, 6},
+ expectedProgressMarker: &ProgressMarker{
+ SegmentIndex: 5,
+ Offset: 558,
+ },
+ expectedSegmentsState: &segmentsRange{
+ firstSegment: 5,
+ currentSegment: 6,
+ lastSegment: 6,
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ dir := t.TempDir()
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ generateSegments(t, wdir, tc.segments, seriesCount, samplesCount)
+
+ // Set up and run the watcher.
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, "test", wt, dir, false, false, false)
+ watcher.setMetrics()
+ // not nil means we want to overwrite it.
+ if tc.progressMarker != nil {
+ watcher.progressMarker = tc.progressMarker
+ }
+ if tc.segmentsState != nil {
+ watcher.segmentsState = tc.segmentsState
+ }
+
+ err := watcher.syncOnIteration()
+
+ if tc.expectedToFail {
+ require.Error(t, err)
+ return
+ }
+ require.NoError(t, err)
+
+ require.Equal(t, tc.expectedProgressMarker, watcher.progressMarker)
+ require.Equal(t, tc.expectedSegmentsState, watcher.segmentsState)
+ require.Equal(t, 0, wt.seriesCount())
+ require.Equal(t, 0, wt.samplesAppended)
+ })
+ }
+}
+
+func TestStateful_CatchAfterStop(t *testing.T) {
+ t.Parallel()
+
+ const seriesCount = 10
+ const samplesCount = 50
+
+ dir := t.TempDir()
+ wdir := path.Join(dir, "wal")
+ require.NoError(t, os.Mkdir(wdir, 0o777))
+
+ w, err := NewSize(nil, nil, wdir, pageSize, CompressionNone)
+ require.NoError(t, err)
+ defer func() {
+ require.NoError(t, w.Close())
+ }()
+ require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
+
+ // Start the watcher and make sure the appropriate data was forwarded.
+ watcherName := "foo"
+ wt := newWriteToMock(0)
+ watcher := NewStatefulWatcher(wMetrics, nil, nil, watcherName, wt, dir, false, false, false)
+ watcher.setMetrics()
+ go watcher.Start()
+
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == seriesCount && wt.samplesCount() == seriesCount*samplesCount
+ }, 3*time.Second, 100*time.Millisecond)
+ // Stop the watcher.
+ watcher.Stop()
+
+ // Add more records to the current segment and add a new one.
+ require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
+ w.NextSegment()
+ require.NoError(t, generateWALRecords(w, 1, seriesCount, samplesCount))
+
+ // Re-start the watcher and ensures it catches up.
+ wt = newWriteToMock(0)
+ watcher = NewStatefulWatcher(wMetrics, nil, nil, watcherName, wt, dir, false, false, false)
+ watcher.setMetrics()
+ go watcher.Start()
+
+ require.Eventually(t, func() bool {
+ return wt.seriesCount() == 2*seriesCount && wt.samplesCount() == 2*seriesCount*samplesCount
+ }, 3*time.Second, 100*time.Millisecond)
+ watcher.Stop()
+}
diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go
index ac5041e87..40c5744a9 100644
--- a/tsdb/wlog/watcher.go
+++ b/tsdb/wlog/watcher.go
@@ -34,15 +34,21 @@ import (
)
const (
- checkpointPeriod = 5 * time.Second
- segmentCheckPeriod = 100 * time.Millisecond
- consumer = "consumer"
+ consumer = "consumer"
)
-var (
- ErrIgnorable = errors.New("ignore me")
- readTimeout = 15 * time.Second
-)
+var ErrIgnorable = errors.New("ignore me")
+
+// WALWatcher allows chosing between Watcher and StatefulWatcher.
+type WALWatcher interface {
+ // Used as an identifier for StatefulWatcher.
+ Name() string
+ Start()
+ Notify()
+ // Only makes sense for StatefulWatcher.
+ PurgeState()
+ Stop()
+}
// WriteTo is an interface used by the Watcher to send the samples it's read
// from the WAL on to somewhere else. Functions will be called concurrently
@@ -73,11 +79,10 @@ type WriteNotified interface {
}
type WatcherMetrics struct {
- recordsRead *prometheus.CounterVec
- recordDecodeFails *prometheus.CounterVec
- samplesSentPreTailing *prometheus.CounterVec
- currentSegment *prometheus.GaugeVec
- notificationsSkipped *prometheus.CounterVec
+ recordsRead *prometheus.CounterVec
+ recordDecodeFails *prometheus.CounterVec
+ currentSegment *prometheus.GaugeVec
+ notificationsSkipped *prometheus.CounterVec
}
// Watcher watches the TSDB WAL for a given WriteTo.
@@ -99,7 +104,6 @@ type Watcher struct {
recordsReadMetric *prometheus.CounterVec
recordDecodeFailsMetric prometheus.Counter
- samplesSentPreTailing prometheus.Counter
currentSegmentMetric prometheus.Gauge
notificationsSkipped prometheus.Counter
@@ -107,6 +111,11 @@ type Watcher struct {
quit chan struct{}
done chan struct{}
+ // To ease their use in tests.
+ checkpointPeriod time.Duration
+ segmentCheckPeriod time.Duration
+ readTimeout time.Duration
+
// For testing, stop when we hit this segment.
MaxSegment int
}
@@ -131,15 +140,6 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
},
[]string{consumer},
),
- samplesSentPreTailing: prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "prometheus",
- Subsystem: "wal_watcher",
- Name: "samples_sent_pre_tailing_total",
- Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.",
- },
- []string{consumer},
- ),
currentSegment: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "prometheus",
@@ -163,7 +163,6 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
if reg != nil {
reg.MustRegister(m.recordsRead)
reg.MustRegister(m.recordDecodeFails)
- reg.MustRegister(m.samplesSentPreTailing)
reg.MustRegister(m.currentSegment)
reg.MustRegister(m.notificationsSkipped)
}
@@ -191,10 +190,18 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
quit: make(chan struct{}),
done: make(chan struct{}),
+ checkpointPeriod: 5 * time.Second,
+ segmentCheckPeriod: 100 * time.Millisecond,
+ readTimeout: 15 * time.Second,
+
MaxSegment: -1,
}
}
+func (w *Watcher) Name() string {
+ return w.name
+}
+
func (w *Watcher) Notify() {
select {
case w.readNotify <- struct{}{}:
@@ -213,7 +220,6 @@ func (w *Watcher) setMetrics() {
if w.metrics != nil {
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
- w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
}
@@ -237,13 +243,16 @@ func (w *Watcher) Stop() {
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
- w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name)
}
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
}
+func (w *Watcher) PurgeState() {
+ level.Debug(w.logger).Log("msg", "Watcher has no state to purge")
+}
+
func (w *Watcher) loop() {
defer close(w.done)
@@ -375,13 +384,13 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
return w.readAndHandleError(reader, segmentNum, tail, size)
}
- checkpointTicker := time.NewTicker(checkpointPeriod)
+ checkpointTicker := time.NewTicker(w.checkpointPeriod)
defer checkpointTicker.Stop()
- segmentTicker := time.NewTicker(segmentCheckPeriod)
+ segmentTicker := time.NewTicker(w.segmentCheckPeriod)
defer segmentTicker.Stop()
- readTicker := time.NewTicker(readTimeout)
+ readTicker := time.NewTicker(w.readTimeout)
defer readTicker.Stop()
gcSem := make(chan struct{}, 1)
@@ -420,17 +429,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if last > segmentNum {
return w.readAndHandleError(reader, segmentNum, tail, size)
}
- continue
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C:
- level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
+ level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", w.readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
return err
}
// reset the ticker so we don't read too often
- readTicker.Reset(readTimeout)
+ readTicker.Reset(w.readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
@@ -438,7 +446,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
return err
}
// reset the ticker so we don't read too often
- readTicker.Reset(readTimeout)
+ readTicker.Reset(w.readTimeout)
}
}
}
@@ -452,9 +460,10 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
if dir == "" || dir == w.lastCheckpoint {
return nil
}
+ // TODO: shouldn't we set this later, after readCheckpoint??
w.lastCheckpoint = dir
- index, err := checkpointNum(dir)
+ index, err := checkpointIndex(dir)
if err != nil {
return fmt.Errorf("error parsing checkpoint filename: %w", err)
}
@@ -470,7 +479,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
return fmt.Errorf("readCheckpoint: %w", err)
}
- // Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
+ // Clear series with a checkpoint or segment index lower than the checkpoint index we just read.
w.writer.SeriesReset(index)
return nil
}
@@ -671,9 +680,9 @@ type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) er
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
- index, err := checkpointNum(checkpointDir)
+ index, err := checkpointIndex(checkpointDir)
if err != nil {
- return fmt.Errorf("checkpointNum: %w", err)
+ return fmt.Errorf("checkpointIndex: %w", err)
}
// Ensure we read the whole contents of every segment in the checkpoint dir.
@@ -708,7 +717,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
return nil
}
-func checkpointNum(dir string) (int, error) {
+func checkpointIndex(dir string) (int, error) {
// Checkpoint dir names are in the format checkpoint.000001
// dir may contain a hidden directory, so only check the base directory
chunks := strings.Split(filepath.Base(dir), ".")
diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go
index dc0314e8c..78a51d59d 100644
--- a/tsdb/wlog/watcher_test.go
+++ b/tsdb/wlog/watcher_test.go
@@ -17,7 +17,6 @@ import (
"math/rand"
"os"
"path"
- "runtime"
"sync"
"testing"
"time"
@@ -54,9 +53,13 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) {
type writeToMock struct {
samplesAppended int
+ samplesLock sync.Mutex
exemplarsAppended int
+ exemplarsLock sync.Mutex
histogramsAppended int
+ histogramsLock sync.Mutex
floatHistogramsAppended int
+ floatHistogramsLock sync.Mutex
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
@@ -65,24 +68,32 @@ type writeToMock struct {
}
func (wtm *writeToMock) Append(s []record.RefSample) bool {
+ wtm.samplesLock.Lock()
+ defer wtm.samplesLock.Unlock()
time.Sleep(wtm.delay)
wtm.samplesAppended += len(s)
return true
}
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
+ wtm.exemplarsLock.Lock()
+ defer wtm.exemplarsLock.Unlock()
time.Sleep(wtm.delay)
wtm.exemplarsAppended += len(e)
return true
}
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
+ wtm.histogramsLock.Lock()
+ defer wtm.histogramsLock.Unlock()
time.Sleep(wtm.delay)
wtm.histogramsAppended += len(h)
return true
}
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
+ wtm.floatHistogramsLock.Lock()
+ defer wtm.floatHistogramsLock.Unlock()
time.Sleep(wtm.delay)
wtm.floatHistogramsAppended += len(fh)
return true
@@ -115,12 +126,36 @@ func (wtm *writeToMock) SeriesReset(index int) {
}
}
-func (wtm *writeToMock) checkNumSeries() int {
+func (wtm *writeToMock) seriesCount() int {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
return len(wtm.seriesSegmentIndexes)
}
+func (wtm *writeToMock) samplesCount() int {
+ wtm.samplesLock.Lock()
+ defer wtm.samplesLock.Unlock()
+ return wtm.samplesAppended
+}
+
+func (wtm *writeToMock) examplarsCount() int {
+ wtm.exemplarsLock.Lock()
+ defer wtm.exemplarsLock.Unlock()
+ return wtm.exemplarsAppended
+}
+
+func (wtm *writeToMock) histogramsCount() int {
+ wtm.histogramsLock.Lock()
+ defer wtm.histogramsLock.Unlock()
+ return wtm.histogramsAppended
+}
+
+func (wtm *writeToMock) floatHistogramsCount() int {
+ wtm.floatHistogramsLock.Lock()
+ defer wtm.floatHistogramsLock.Unlock()
+ return wtm.floatHistogramsAppended
+}
+
func newWriteToMock(delay time.Duration) *writeToMock {
return &writeToMock{
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@@ -129,6 +164,8 @@ func newWriteToMock(delay time.Duration) *writeToMock {
}
func TestTailSamples(t *testing.T) {
+ t.Parallel()
+
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
@@ -136,6 +173,8 @@ func TestTailSamples(t *testing.T) {
const histogramsCount = 50
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
now := time.Now()
dir := t.TempDir()
@@ -242,24 +281,28 @@ func TestTailSamples(t *testing.T) {
expectedExemplars := seriesCount * exemplarsCount
expectedHistograms := seriesCount * histogramsCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
- return wt.checkNumSeries() >= expectedSeries
+ return wt.seriesCount() >= expectedSeries
})
- require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series")
- require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
- require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
- require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
- require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms")
+ require.Equal(t, expectedSeries, wt.seriesCount(), "did not receive the expected number of series")
+ require.Equal(t, expectedSamples, wt.samplesCount(), "did not receive the expected number of samples")
+ require.Equal(t, expectedExemplars, wt.examplarsCount(), "did not receive the expected number of exemplars")
+ require.Equal(t, expectedHistograms, wt.histogramsCount(), "did not receive the expected number of histograms")
+ require.Equal(t, expectedHistograms, wt.floatHistogramsCount(), "did not receive the expected number of float histograms")
})
}
}
func TestReadToEndNoCheckpoint(t *testing.T) {
+ t.Parallel()
+
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
dir := t.TempDir()
wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
@@ -302,24 +345,27 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
}
}
require.NoError(t, w.Log(recs...))
- readTimeout = time.Second
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
+ watcher.readTimeout = time.Second
go watcher.Start()
expected := seriesCount
require.Eventually(t, func() bool {
- return wt.checkNumSeries() == expected
+ return wt.seriesCount() == expected
}, 20*time.Second, 1*time.Second)
watcher.Stop()
})
}
}
+// TestReadToEndWithCheckpoint.
func TestReadToEndWithCheckpoint(t *testing.T) {
+ t.Parallel()
+
segmentSize := 32 * 1024
// We need something similar to this # of series and samples
// in order to get enough segments for us to checkpoint.
@@ -328,6 +374,8 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@@ -394,15 +442,15 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
- readTimeout = time.Second
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
+ watcher.readTimeout = time.Second
go watcher.Start()
expected := seriesCount * 2
require.Eventually(t, func() bool {
- return wt.checkNumSeries() == expected
+ return wt.seriesCount() == expected
}, 10*time.Second, 1*time.Second)
watcher.Stop()
})
@@ -410,12 +458,16 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
}
func TestReadCheckpoint(t *testing.T) {
+ t.Parallel()
+
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@@ -472,15 +524,17 @@ func TestReadCheckpoint(t *testing.T) {
expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
- return wt.checkNumSeries() >= expectedSeries
+ return wt.seriesCount() >= expectedSeries
})
watcher.Stop()
- require.Equal(t, expectedSeries, wt.checkNumSeries())
+ require.Equal(t, expectedSeries, wt.seriesCount())
})
}
}
func TestReadCheckpointMultipleSegments(t *testing.T) {
+ t.Parallel()
+
pageSize := 32 * 1024
const segments = 1
@@ -489,6 +543,8 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
+ t.Parallel()
+
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@@ -552,6 +608,8 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
}
func TestCheckpointSeriesReset(t *testing.T) {
+ t.Parallel()
+
segmentSize := 32 * 1024
// We need something similar to this # of series and samples
// in order to get enough segments for us to checkpoint.
@@ -607,18 +665,18 @@ func TestCheckpointSeriesReset(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
- readTimeout = time.Second
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
+ watcher.readTimeout = time.Second
watcher.MaxSegment = -1
go watcher.Start()
expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
- return wt.checkNumSeries() >= expected
+ return wt.seriesCount() >= expected
})
require.Eventually(t, func() bool {
- return wt.checkNumSeries() == seriesCount
+ return wt.seriesCount() == seriesCount
}, 10*time.Second, 1*time.Second)
_, err = Checkpoint(log.NewNopLogger(), w, 2, 4, func(x chunks.HeadSeriesRef) bool { return true }, 0)
@@ -637,13 +695,16 @@ func TestCheckpointSeriesReset(t *testing.T) {
// many series records you end up with and change the last Equals check accordingly
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
require.Eventually(t, func() bool {
- return wt.checkNumSeries() == tc.segments
+ return wt.seriesCount() == tc.segments
}, 20*time.Second, 1*time.Second)
})
}
}
+// TestRun_StartupTime ensures that on startup, the watcher doesn’t rely on readTimeout to read existing segments.
func TestRun_StartupTime(t *testing.T) {
+ t.Parallel()
+
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
@@ -651,6 +712,8 @@ func TestRun_StartupTime(t *testing.T) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
+ t.Parallel()
+
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@@ -695,7 +758,7 @@ func TestRun_StartupTime(t *testing.T) {
startTime := time.Now()
err = watcher.Run()
- require.Less(t, time.Since(startTime), readTimeout)
+ require.Less(t, time.Since(startTime), watcher.readTimeout)
require.NoError(t, err)
})
}
@@ -732,60 +795,4 @@ func generateWALRecords(w *WL, segment, seriesCount, samplesCount int) error {
return nil
}
-func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
- if runtime.GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms.
- t.SkipNow()
- }
- const segmentSize = pageSize // Smallest allowed segment size.
- const segmentsToWrite = 5
- const segmentsToRead = segmentsToWrite - 1
- const seriesCount = 10
- const samplesCount = 50
-
- // This test can take longer than intended to finish in cloud CI.
- readTimeout := 10 * time.Second
-
- for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
- t.Run(string(compress), func(t *testing.T) {
- dir := t.TempDir()
-
- wdir := path.Join(dir, "wal")
- err := os.Mkdir(wdir, 0o777)
- require.NoError(t, err)
-
- w, err := NewSize(nil, nil, wdir, segmentSize, compress)
- require.NoError(t, err)
- var wg sync.WaitGroup
- // Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk.
- require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
- w.NextSegment() // Force creation of the next segment
- wg.Add(1)
- go func() {
- defer wg.Done()
- for i := 1; i < segmentsToWrite; i++ {
- require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
- w.NextSegment()
- }
- }()
-
- wt := newWriteToMock(time.Millisecond)
- watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
- watcher.MaxSegment = segmentsToRead
-
- watcher.setMetrics()
- startTime := time.Now()
- err = watcher.Run()
- wg.Wait()
- require.Less(t, time.Since(startTime), readTimeout)
-
- // But samples records shouldn't get dropped
- retry(t, defaultRetryInterval, defaultRetries, func() bool {
- return wt.checkNumSeries() > 0
- })
- require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
-
- require.NoError(t, err)
- require.NoError(t, w.Close())
- })
- }
-}
+// TODO: add TestRun_AvoidNotifyWhenBehind back
diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go
index ef9d53dd9..8aa0286ae 100644
--- a/web/api/v1/api_test.go
+++ b/web/api/v1/api_test.go
@@ -486,7 +486,7 @@ func TestEndpoints(t *testing.T) {
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil
- }, dbDir, 1*time.Second, nil, false)
+ }, dbDir, 1*time.Second, nil, false, false)
err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{