feat(remote_write): allow using a stateful WAL watcher

behind --enable-feature=remote-write-stateful-watcher

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
machine424 2024-08-12 18:05:14 +02:00
parent 1e01e97151
commit 9f6c250399
No known key found for this signature in database
GPG key ID: A4B001A4FDEE017D
18 changed files with 2823 additions and 316 deletions

View file

@ -161,12 +161,13 @@ type flagConfig struct {
memlimitRatio float64
// These options are extracted from featureList
// for ease of use.
enableExpandExternalLabels bool
enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool
enableExpandExternalLabels bool
enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
enableAutoGOMEMLIMIT bool
enableConcurrentRuleEval bool
enableRemoteWriteStatefulWacher bool
prometheusURL string
corsRegexString string
@ -259,6 +260,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
continue
case "promql-at-modifier", "promql-negative-offset":
level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", o)
case "remote-write-stateful-watcher":
c.enableRemoteWriteStatefulWacher = true
level.Info(logger).Log("msg", "Experimental remote write stateful watcher enabled.")
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
}
@ -504,7 +508,7 @@ func main() {
a.Flag("scrape.name-escaping-scheme", `Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots".`).Default(scrape.DefaultNameEscapingScheme.String()).StringVar(&cfg.nameEscapingScheme)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), remote-write-stateful-watcher, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@ -691,9 +695,10 @@ func main() {
level.Info(logger).Log("vm_limits", prom_runtime.VMLimits())
var (
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata)
localStorage = &readyStorage{stats: tsdb.NewDBStats()}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, cfg.scrape.AppendMetadata, cfg.enableRemoteWriteStatefulWacher)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

View file

@ -391,9 +391,10 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
if rwcfg == nil {
return errors.New("empty or null remote write config section")
}
// Skip empty names, we fill their name with their config hash in remote write code.
// Skip empty names, we could fill their name with their config hash in remote write code.
if _, ok := rwNames[rwcfg.Name]; ok && rwcfg.Name != "" {
return fmt.Errorf("found multiple remote write configs with job name %q", rwcfg.Name)
return fmt.Errorf("found multiple remote write configs with name %q", rwcfg.Name)
}
rwNames[rwcfg.Name] = struct{}{}
}

View file

@ -1850,7 +1850,7 @@ var expectedErrors = []struct {
},
{
filename: "remote_write_dup.bad.yml",
errMsg: `found multiple remote write configs with job name "queue1"`,
errMsg: `found multiple remote write configs with name "queue1"`,
},
{
filename: "remote_read_dup.bad.yml",

View file

@ -58,7 +58,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--scrape.name-escaping-scheme</code> | Method for escaping legacy invalid names when sending to Prometheus that does not support UTF-8. Can be one of "values", "underscores", or "dots". | `values` |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: agent, auto-gomaxprocs, auto-gomemlimit, auto-reload-config, concurrent-rule-eval, created-timestamp-zero-ingestion, delayed-compaction, exemplar-storage, expand-external-labels, extra-scrape-metrics, memory-snapshot-on-shutdown, native-histograms, new-service-discovery-manager, no-default-scrape-port, otlp-write-receiver, promql-experimental-functions, promql-delayed-name-removal, promql-per-step-stats, remote-write-receiver (DEPRECATED), remote-write-stateful-watcher, utf8-names. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

View file

@ -277,3 +277,7 @@ specified interval. The interval is defined by the
Configuration reloads are triggered by detecting changes in the checksum of the
main configuration file or any referenced files, such as rule and scrape
configurations.
## Remote Write Stateful Watcher
TODO

View file

@ -415,7 +415,7 @@ type QueueManager struct {
relabelConfigs []*relabel.Config
sendExemplars bool
sendNativeHistograms bool
watcher *wlog.Watcher
watcher wlog.WALWatcher
metadataWatcher *MetadataWatcher
clientMtx sync.RWMutex
@ -456,6 +456,7 @@ func NewQueueManager(
readerMetrics *wlog.LiveReaderMetrics,
logger log.Logger,
dir string,
statefulWatcher bool,
samplesIn *ewmaRate,
cfg config.QueueConfig,
mCfg config.MetadataConfig,
@ -519,7 +520,11 @@ func NewQueueManager(
if t.protoMsg != config.RemoteWriteProtoMsgV1 {
walMetadata = true
}
t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
if statefulWatcher {
t.watcher = wlog.NewStatefulWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, walMetadata)
}
// The current MetadataWatcher implementation is mutually exclusive
// with the new approach, which stores metadata as WAL records and

View file

@ -17,11 +17,9 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"os"
"runtime/pprof"
"sort"
"strconv"
"strings"
"sync"
@ -133,7 +131,7 @@ func TestBasicContentNegotiation(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true, false)
defer s.Close()
var (
@ -241,7 +239,7 @@ func TestSampleDelivery(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true, false)
defer s.Close()
var (
@ -309,17 +307,17 @@ func TestSampleDelivery(t *testing.T) {
}
}
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg) (*TestWriteClient, *QueueManager) {
func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, protoMsg config.RemoteWriteProtoMsg, statefulWatcher bool) (*TestWriteClient, *QueueManager) {
c := NewTestWriteClient(protoMsg)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
return c, newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg, statefulWatcher)
}
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg, statefulWatcher bool) *QueueManager {
dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
m := NewQueueManager(metrics, nil, nil, nil, dir, statefulWatcher, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, protoMsg)
return m
}
@ -332,7 +330,7 @@ func testDefaultQueueConfig() config.QueueConfig {
}
func TestMetadataDelivery(t *testing.T) {
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1, false)
m.Start()
defer m.Stop()
@ -360,7 +358,7 @@ func TestMetadataDelivery(t *testing.T) {
func TestWALMetadataDelivery(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, true, false)
defer s.Close()
cfg := config.DefaultQueueConfig
@ -410,7 +408,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -447,7 +445,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
})
}
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg, false)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
@ -467,7 +465,7 @@ func TestShutdown(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1, false)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -502,7 +500,7 @@ func TestSeriesReset(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1)
m := newTestQueueManager(t, cfg, mcfg, deadline, c, config.RemoteWriteProtoMsgV1, false)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -527,7 +525,7 @@ func TestReshard(t *testing.T) {
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg)
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg, false)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
@ -566,7 +564,7 @@ func TestReshardRaceWithStop(t *testing.T) {
exitCh := make(chan struct{})
go func() {
for {
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
m = newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg, false)
m.Start()
h.Unlock()
@ -605,7 +603,7 @@ func TestReshardPartialBatch(t *testing.T) {
flushDeadline := 10 * time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg, false)
m.StoreSeries(series, 0)
m.Start()
@ -652,7 +650,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
batchSendDeadline := time.Millisecond
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg, false)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -679,7 +677,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
func TestReleaseNoninternedString(t *testing.T) {
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg, false)
m.Start()
defer m.Stop()
for i := 1; i < 1000; i++ {
@ -727,7 +725,7 @@ func TestShouldReshard(t *testing.T) {
}
for _, c := range cases {
_, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1)
_, m := newTestClientAndQueueManager(t, time.Duration(c.sendDeadline), config.RemoteWriteProtoMsgV1, false)
m.numShards = c.startingShards
m.dataIn.incr(c.samplesIn)
m.dataOut.incr(c.samplesOut)
@ -772,7 +770,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
}
)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, "", false, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0)
// Attempt to samples while the manager is running. We immediately stop the
@ -1326,7 +1324,7 @@ func BenchmarkSampleSend(b *testing.B) {
cfg.MaxShards = 20
// todo: test with new proto type(s)
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1, false)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -1385,7 +1383,7 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, dir, false, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
@ -1395,7 +1393,8 @@ func BenchmarkStoreSeries(b *testing.B) {
}
}
func BenchmarkStartup(b *testing.B) {
// TODO: adjust this
/* func BenchmarkStartup(b *testing.B) {
dir := os.Getenv("WALDIR")
if dir == "" {
b.Skip("WALDIR env var not set")
@ -1431,7 +1430,7 @@ func BenchmarkStartup(b *testing.B) {
err := m.watcher.Run()
require.NoError(b, err)
}
}
} */
func TestProcessExternalLabels(t *testing.T) {
b := labels.NewBuilder(labels.EmptyLabels())
@ -1504,7 +1503,7 @@ func TestProcessExternalLabels(t *testing.T) {
func TestCalculateDesiredShards(t *testing.T) {
cfg := config.DefaultQueueConfig
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1, false)
samplesIn := m.dataIn
// Need to start the queue manager so the proper metrics are initialized.
@ -1574,7 +1573,7 @@ func TestCalculateDesiredShards(t *testing.T) {
}
func TestCalculateDesiredShardsDetail(t *testing.T) {
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, config.RemoteWriteProtoMsgV1, false)
samplesIn := m.dataIn
for _, tc := range []struct {
@ -1952,7 +1951,7 @@ func TestDropOldTimeSeries(t *testing.T) {
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.SampleAgeLimit = model.Duration(60 * time.Second)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, config.RemoteWriteProtoMsgV1, false)
m.StoreSeries(series, 0)
m.Start()
@ -1987,7 +1986,7 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
metadataCfg.SendInterval = model.Duration(time.Second * 60)
metadataCfg.MaxSamplesPerSend = maxSamplesPerSend
c := NewTestWriteClient(config.RemoteWriteProtoMsgV1)
m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, config.RemoteWriteProtoMsgV1)
m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c, config.RemoteWriteProtoMsgV1, false)
m.Start()

View file

@ -93,7 +93,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,

View file

@ -62,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL bool) *Storage {
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWAL, statefulWatcher bool) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logger,
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, metadataInWAL, statefulWatcher)
return s
}

View file

@ -29,7 +29,7 @@ import (
func TestStorageLifecycle(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -56,7 +56,7 @@ func TestStorageLifecycle(t *testing.T) {
func TestUpdateRemoteReadConfigs(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -77,7 +77,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
func TestFilterExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -102,7 +102,7 @@ func TestFilterExternalLabels(t *testing.T) {
func TestIgnoreExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{
@ -154,7 +154,7 @@ func baseRemoteReadConfig(host string) *config.RemoteReadConfig {
// ApplyConfig runs concurrently with Notify
// See https://github.com/prometheus/prometheus/issues/12747
func TestWriteStorageApplyConfigsDuringCommit(t *testing.T) {
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false)
s := NewStorage(nil, nil, nil, t.TempDir(), defaultFlushDeadline, nil, false, false)
var wg sync.WaitGroup
wg.Add(2000)

View file

@ -18,10 +18,13 @@ import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -73,12 +76,14 @@ type WriteStorage struct {
scraper ReadyScrapeManager
quit chan struct{}
statefulWatcher bool
// For timestampTracker.
highestTimestamp *maxTimestamp
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal bool) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, metadataInWal, statefulWatcher bool) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
@ -95,6 +100,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
scraper: sm,
quit: make(chan struct{}),
metadataInWAL: metadataInWal,
statefulWatcher: statefulWatcher,
highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
@ -124,6 +130,49 @@ func (rws *WriteStorage) run() {
}
}
// purgeStaleWatchersMarkers iterates over progress marker files and deleted those of stateful watchers that are no longer in use.
// rws.mtx should be held before calling this.
func (rws *WriteStorage) purgeStaleWatchersMarkers() {
markersDir := filepath.Join(rws.dir, "wal", wlog.ProgressMarkerDirName)
files, err := os.ReadDir(markersDir)
if err != nil {
level.Error(rws.logger).Log(
"msg", "Failed to read the directory containing the stateful watchers' progress marker files",
"dir", markersDir,
"err", err,
)
}
statefulWatchers := make(map[string]struct{})
for _, q := range rws.queues {
statefulWatchers[q.watcher.Name()] = struct{}{}
}
for _, f := range files {
fileName := f.Name()
if filepath.Ext(fileName) != wlog.ProgressMarkerFileExt {
level.Warn(rws.logger).Log(
"msg", "the file doesn't belong in the directory",
"file", fileName,
"dir", markersDir,
)
continue
}
watcherName := fileName[:len(fileName)-len(wlog.ProgressMarkerFileExt)]
if _, inUse := statefulWatchers[watcherName]; !inUse {
err = os.Remove(filepath.Join(markersDir, fileName))
if err != nil {
level.Error(rws.logger).Log(
"msg", "Failed to remove progress marker file",
"dir", markersDir,
"file", fileName,
"err", err,
)
}
}
}
}
func (rws *WriteStorage) Notify() {
rws.mtx.Lock()
defer rws.mtx.Unlock()
@ -148,9 +197,14 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newQueues := make(map[string]*QueueManager)
newHashes := []string{}
for _, rwConf := range conf.RemoteWriteConfigs {
if rwConf.Name == "" && rws.statefulWatcher {
return fmt.Errorf("empty name for URL: %s, the `--enable-feature=remote-write-stateful-watcher` feature flag requires it to be set as it's used as an identifier", rwConf.URL)
}
if rwConf.ProtobufMessage == config.RemoteWriteProtoMsgV2 && !rws.metadataInWAL {
return errors.New("invalid remote write configuration, if you are using remote write version 2.0 the `--enable-feature=metadata-wal-records` feature flag must be enabled")
}
hash, err := toHash(rwConf)
if err != nil {
return err
@ -203,6 +257,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.liveReaderMetrics,
rws.logger,
rws.dir,
rws.statefulWatcher,
rws.samplesIn,
rwConf.QueueConfig,
rwConf.MetadataConfig,
@ -221,10 +276,25 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
newHashes = append(newHashes, hash)
}
// Anything remaining in rws.queues is a queue who's config has
// Used to identify stale statefulset watchers whose states should be pruged, see below.
newStatefulWatchers := make(map[string]struct{})
if rws.statefulWatcher {
for _, q := range newQueues {
newStatefulWatchers[q.watcher.Name()] = struct{}{}
}
}
// Anything remaining in rws.queues is a queue whose config has
// changed or was removed from the overall remote write config.
for _, q := range rws.queues {
q.Stop()
if rws.statefulWatcher {
// Purge the statefulset watcher if its corresponding queue config was removed.
if _, found := newStatefulWatchers[q.watcher.Name()]; !found {
q.watcher.PurgeState()
}
}
}
for _, hash := range newHashes {
@ -271,6 +341,9 @@ func (rws *WriteStorage) Close() error {
for _, q := range rws.queues {
q.Stop()
}
if rws.statefulWatcher {
rws.purgeStaleWatchersMarkers()
}
close(rws.quit)
return nil
}

View file

@ -16,9 +16,12 @@ package remote
import (
"bytes"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path"
"testing"
"time"
@ -35,9 +38,9 @@ import (
"github.com/prometheus/prometheus/model/relabel"
)
func testRemoteWriteConfig() *config.RemoteWriteConfig {
func testRemoteWriteConfig(name string) *config.RemoteWriteConfig {
return &config.RemoteWriteConfig{
Name: "dev",
Name: name,
URL: &common_config.URL{
URL: &url.URL{
Scheme: "http",
@ -105,7 +108,7 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -124,36 +127,40 @@ func TestWriteStorageApplyConfig_NoDuplicateWriteConfigs(t *testing.T) {
}
func TestWriteStorageApplyConfig_RestartOnNameChange(t *testing.T) {
dir := t.TempDir()
for _, statefulWatcher := range []bool{true, false} {
t.Run(fmt.Sprintf("statefulWatcher=%t", statefulWatcher), func(t *testing.T) {
dir := t.TempDir()
cfg := testRemoteWriteConfig()
cfg := testRemoteWriteConfig("dev")
hash, err := toHash(cfg)
require.NoError(t, err)
hash, err := toHash(cfg)
require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, statefulWatcher)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{cfg},
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{cfg},
}
require.NoError(t, s.ApplyConfig(conf))
require.Equal(t, s.queues[hash].client().Name(), cfg.Name)
// Change the queues name, ensure the queue has been restarted.
conf.RemoteWriteConfigs[0].Name = "dev-2"
require.NoError(t, s.ApplyConfig(conf))
hash, err = toHash(cfg)
require.NoError(t, err)
require.Equal(t, s.queues[hash].client().Name(), conf.RemoteWriteConfigs[0].Name)
require.NoError(t, s.Close())
})
}
require.NoError(t, s.ApplyConfig(conf))
require.Equal(t, s.queues[hash].client().Name(), cfg.Name)
// Change the queues name, ensure the queue has been restarted.
conf.RemoteWriteConfigs[0].Name = "dev-2"
require.NoError(t, s.ApplyConfig(conf))
hash, err = toHash(cfg)
require.NoError(t, err)
require.Equal(t, s.queues[hash].client().Name(), conf.RemoteWriteConfigs[0].Name)
require.NoError(t, s.Close())
}
func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, false, false)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -194,7 +201,7 @@ func TestWriteStorageApplyConfig_UpdateWithRegisterer(t *testing.T) {
func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, false)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -207,16 +214,93 @@ func TestWriteStorageApplyConfig_Lifecycle(t *testing.T) {
require.NoError(t, s.Close())
}
func TestWriteStorageApplyConfig_StatefulWatcher(t *testing.T) {
dir := t.TempDir()
markersDir := path.Join(dir, "wal", "remote_write")
// WriteStorage with a WAL stateful watcher.
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, true)
cfg1 := testRemoteWriteConfig("")
cfg2 := testRemoteWriteConfig("bar")
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{cfg1, cfg2},
}
// 1. remote-write config cfg1 without a name is not accepted.
require.Error(t, s.ApplyConfig(conf))
require.Empty(t, s.queues)
// 2. two remote-write configs with the same name is not accpected.
conf.RemoteWriteConfigs[0].Name = "bar"
require.Error(t, s.ApplyConfig(conf))
require.Empty(t, s.queues)
// 3. Set the cfg1 name back.
conf.RemoteWriteConfigs[0].Name = "foo"
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 2)
// Ensure the watchers have the appropriate names.
hash1, err := toHash(conf.RemoteWriteConfigs[0])
require.NoError(t, err)
require.Equal(t, "foo", s.queues[hash1].watcher.Name())
hash2, err := toHash(conf.RemoteWriteConfigs[1])
require.NoError(t, err)
require.Equal(t, "bar", s.queues[hash2].watcher.Name())
// Ensure the progress marker files were clreated.
require.FileExists(t, path.Join(markersDir, "foo.json"))
require.FileExists(t, path.Join(markersDir, "bar.json"))
// 4. Simulate a remote-write config entry removal by chaning cfg2 name.
conf.RemoteWriteConfigs[1].Name = "baz"
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 2)
// Ensure the appropriate progress marker files are present.
require.FileExists(t, path.Join(markersDir, "foo.json"))
require.FileExists(t, path.Join(markersDir, "baz.json"))
require.NoFileExists(t, path.Join(markersDir, "bar.json"))
require.NoError(t, s.Close())
// Ensure progress marker files weren't deleted.
require.FileExists(t, path.Join(markersDir, "foo.json"))
require.FileExists(t, path.Join(markersDir, "baz.json"))
}
func TestWriteStorageCleanup_StatefulWatcher(t *testing.T) {
dir := t.TempDir()
// There are some stale progress markers files around without corresponding remote-write config entries.
markersDir := path.Join(dir, "wal", "remote_write")
os.MkdirAll(markersDir, 0o700)
f, err := os.Create(path.Join(markersDir, "foo.json"))
require.NoError(t, err)
require.NoError(t, f.Close())
f, err = os.Create(path.Join(markersDir, "bar.json"))
require.NoError(t, err)
require.NoError(t, f.Close())
// WriteStorage should clean them while closing.
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, false, true)
require.NoError(t, s.Close())
require.NoFileExists(t, path.Join(markersDir, "foo.json"))
require.NoFileExists(t, path.Join(markersDir, "bar.json"))
}
func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, false, false)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
testRemoteWriteConfig(),
testRemoteWriteConfig("dev"),
},
}
hash, err := toHash(conf.RemoteWriteConfigs[0])
@ -236,136 +320,156 @@ func TestWriteStorageApplyConfig_UpdateExternalLabels(t *testing.T) {
}
func TestWriteStorageApplyConfig_Idempotent(t *testing.T) {
dir := t.TempDir()
for _, statefulWatcher := range []bool{true, false} {
t.Run(fmt.Sprintf("statefulWatcher=%t", statefulWatcher), func(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
baseRemoteWriteConfig("http://test-storage.com"),
},
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, statefulWatcher)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{
baseRemoteWriteConfig("http://test-storage.com"),
},
}
if statefulWatcher {
conf.RemoteWriteConfigs[0].Name = "foo"
}
hash, err := toHash(conf.RemoteWriteConfigs[0])
require.NoError(t, err)
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 1)
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 1)
_, hashExists := s.queues[hash]
require.True(t, hashExists, "Queue pointer should have remained the same")
require.NoError(t, s.Close())
})
}
hash, err := toHash(conf.RemoteWriteConfigs[0])
require.NoError(t, err)
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 1)
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 1)
_, hashExists := s.queues[hash]
require.True(t, hashExists, "Queue pointer should have remained the same")
require.NoError(t, s.Close())
}
func TestWriteStorageApplyConfig_PartialUpdate(t *testing.T) {
dir := t.TempDir()
for _, statefulWatcher := range []bool{true, false} {
t.Run(fmt.Sprintf("statefulWatcher=%t", statefulWatcher), func(t *testing.T) {
dir := t.TempDir()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, false, statefulWatcher)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),
QueueConfig: config.DefaultQueueConfig,
WriteRelabelConfigs: []*relabel.Config{
{
Regex: relabel.MustNewRegexp(".+"),
},
},
ProtobufMessage: config.RemoteWriteProtoMsgV1,
}
c1 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(20 * time.Second),
QueueConfig: config.DefaultQueueConfig,
HTTPClientConfig: common_config.HTTPClientConfig{
BearerToken: "foo",
},
ProtobufMessage: config.RemoteWriteProtoMsgV1,
}
c2 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: config.DefaultQueueConfig,
ProtobufMessage: config.RemoteWriteProtoMsgV1,
}
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),
QueueConfig: config.DefaultQueueConfig,
WriteRelabelConfigs: []*relabel.Config{
{
Regex: relabel.MustNewRegexp(".+"),
},
},
ProtobufMessage: config.RemoteWriteProtoMsgV1,
}
if statefulWatcher {
c0.Name = "foo"
}
c1 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(20 * time.Second),
QueueConfig: config.DefaultQueueConfig,
HTTPClientConfig: common_config.HTTPClientConfig{
BearerToken: "foo",
},
ProtobufMessage: config.RemoteWriteProtoMsgV1,
}
if statefulWatcher {
c1.Name = "bar"
}
c2 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: config.DefaultQueueConfig,
ProtobufMessage: config.RemoteWriteProtoMsgV1,
}
if statefulWatcher {
c2.Name = "baz"
}
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
// We need to set URL's so that metric creation doesn't panic.
for i := range conf.RemoteWriteConfigs {
conf.RemoteWriteConfigs[i].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
}
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 3)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
// We need to set URL's so that metric creation doesn't panic.
for i := range conf.RemoteWriteConfigs {
conf.RemoteWriteConfigs[i].URL = &common_config.URL{
URL: &url.URL{
Host: "http://test-storage.com",
},
}
}
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 3)
hashes := make([]string, len(conf.RemoteWriteConfigs))
queues := make([]*QueueManager, len(conf.RemoteWriteConfigs))
storeHashes := func() {
for i := range conf.RemoteWriteConfigs {
hash, err := toHash(conf.RemoteWriteConfigs[i])
hashes := make([]string, len(conf.RemoteWriteConfigs))
queues := make([]*QueueManager, len(conf.RemoteWriteConfigs))
storeHashes := func() {
for i := range conf.RemoteWriteConfigs {
hash, err := toHash(conf.RemoteWriteConfigs[i])
require.NoError(t, err)
hashes[i] = hash
queues[i] = s.queues[hash]
}
}
storeHashes()
// Update c0 and c2.
c0.WriteRelabelConfigs[0] = &relabel.Config{Regex: relabel.MustNewRegexp("foo")}
c2.RemoteTimeout = model.Duration(50 * time.Second)
conf = &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 3)
_, hashExists := s.queues[hashes[0]]
require.False(t, hashExists, "The queue for the first remote write configuration should have been restarted because the relabel configuration has changed.")
q, hashExists := s.queues[hashes[1]]
require.True(t, hashExists, "Hash of unchanged queue should have remained the same")
require.Equal(t, q, queues[1], "Pointer of unchanged queue should have remained the same")
_, hashExists = s.queues[hashes[2]]
require.False(t, hashExists, "The queue for the third remote write configuration should have been restarted because the timeout has changed.")
storeHashes()
secondClient := s.queues[hashes[1]].client()
// Update c1.
c1.HTTPClientConfig.BearerToken = "bar"
err := s.ApplyConfig(conf)
require.NoError(t, err)
hashes[i] = hash
queues[i] = s.queues[hash]
}
require.Len(t, s.queues, 3)
_, hashExists = s.queues[hashes[0]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
q, hashExists = s.queues[hashes[1]]
require.True(t, hashExists, "Hash of queue with secret change should have remained the same")
require.NotEqual(t, secondClient, q.client(), "Pointer of a client with a secret change should not be the same")
_, hashExists = s.queues[hashes[2]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
storeHashes()
// Delete c0.
conf = &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2},
}
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 2)
_, hashExists = s.queues[hashes[0]]
require.False(t, hashExists, "If a config is removed, the queue should be stopped and recreated.")
_, hashExists = s.queues[hashes[1]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
_, hashExists = s.queues[hashes[2]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
require.NoError(t, s.Close())
})
}
storeHashes()
// Update c0 and c2.
c0.WriteRelabelConfigs[0] = &relabel.Config{Regex: relabel.MustNewRegexp("foo")}
c2.RemoteTimeout = model.Duration(50 * time.Second)
conf = &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2},
}
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 3)
_, hashExists := s.queues[hashes[0]]
require.False(t, hashExists, "The queue for the first remote write configuration should have been restarted because the relabel configuration has changed.")
q, hashExists := s.queues[hashes[1]]
require.True(t, hashExists, "Hash of unchanged queue should have remained the same")
require.Equal(t, q, queues[1], "Pointer of unchanged queue should have remained the same")
_, hashExists = s.queues[hashes[2]]
require.False(t, hashExists, "The queue for the third remote write configuration should have been restarted because the timeout has changed.")
storeHashes()
secondClient := s.queues[hashes[1]].client()
// Update c1.
c1.HTTPClientConfig.BearerToken = "bar"
err := s.ApplyConfig(conf)
require.NoError(t, err)
require.Len(t, s.queues, 3)
_, hashExists = s.queues[hashes[0]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
q, hashExists = s.queues[hashes[1]]
require.True(t, hashExists, "Hash of queue with secret change should have remained the same")
require.NotEqual(t, secondClient, q.client(), "Pointer of a client with a secret change should not be the same")
_, hashExists = s.queues[hashes[2]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
storeHashes()
// Delete c0.
conf = &config.Config{
GlobalConfig: config.GlobalConfig{},
RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2},
}
require.NoError(t, s.ApplyConfig(conf))
require.Len(t, s.queues, 2)
_, hashExists = s.queues[hashes[0]]
require.False(t, hashExists, "If a config is removed, the queue should be stopped and recreated.")
_, hashExists = s.queues[hashes[1]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
_, hashExists = s.queues[hashes[2]]
require.True(t, hashExists, "Pointer of unchanged queue should have remained the same")
require.NoError(t, s.Close())
}
func TestOTLPWriteHandler(t *testing.T) {

View file

@ -89,7 +89,7 @@ func createTestAgentDB(t testing.TB, reg prometheus.Registerer, opts *Options) *
t.Helper()
dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false)
rs := remote.NewStorage(log.NewNopLogger(), reg, startTime, dbDir, time.Second*30, nil, false, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@ -585,7 +585,7 @@ func TestLockfile(t *testing.T) {
tsdbutil.TestDirLockerUsage(t, func(t *testing.T, data string, createLock bool) (*tsdbutil.DirLocker, testutil.Closer) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false)
rs := remote.NewStorage(logger, reg, startTime, data, time.Second*30, nil, false, false)
t.Cleanup(func() {
require.NoError(t, rs.Close())
})
@ -605,7 +605,7 @@ func TestLockfile(t *testing.T) {
func Test_ExistingWAL_NextRef(t *testing.T) {
dbDir := t.TempDir()
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false)
rs := remote.NewStorage(log.NewNopLogger(), nil, startTime, dbDir, time.Second*30, nil, false, false)
defer func() {
require.NoError(t, rs.Close())
}()

View file

@ -0,0 +1,850 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wlog
import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
const (
ProgressMarkerDirName string = "remote_write"
ProgressMarkerFileExt string = ".json"
unknownSegment int = -1
)
// ProgressMarker add versioning? TODO.
type ProgressMarker struct {
// TODO:
// FirstSegment int `json:"firstSegmentIndex"`
// LastSegment int `json:"lastSegmentIndex"`
SegmentIndex int `json:"segmentIndex"`
// Offset of read data into the segment.
// Record at the offset is already processed.
Offset int64 `json:"offset"`
}
func (pm *ProgressMarker) String() string {
return fmt.Sprintf("%08d:%d", pm.SegmentIndex, pm.Offset)
}
func (pm *ProgressMarker) isUnknown() bool {
return pm.SegmentIndex == unknownSegment
}
func (w *StatefulWatcher) progressMarkerFilePath() string {
return filepath.Join(w.walDir, ProgressMarkerDirName, w.name+ProgressMarkerFileExt)
}
// TODO: copied from block.go.
func (w *StatefulWatcher) readProgressFile() error {
b, err := os.ReadFile(w.progressMarkerFilePath())
if err != nil {
return err
}
var p ProgressMarker
if err := json.Unmarshal(b, &p); err != nil {
return err
}
w.progressMarker = &p
return nil
}
// TODO: copied from block.go.
func (w *StatefulWatcher) writeProgressFile() (int64, error) {
// Make any changes to the file appear atomic.
path := w.progressMarkerFilePath()
if err := os.MkdirAll(filepath.Dir(path), 0o777); err != nil {
return 0, err
}
tmp := path + ".tmp"
defer func() {
if err := os.RemoveAll(tmp); err != nil {
level.Error(w.logger).Log("msg", "Remove tmp file", "err", err.Error())
}
}()
f, err := os.Create(tmp)
if err != nil {
return 0, err
}
jsonProgress, err := json.MarshalIndent(w.progressMarker, "", "\t")
if err != nil {
return 0, err
}
n, err := f.Write(jsonProgress)
if err != nil {
return 0, errors.Join(err, f.Close())
}
// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
// TODO: really need to Sync??
if err := f.Sync(); err != nil {
return 0, errors.Join(err, f.Close())
}
if err := f.Close(); err != nil {
return 0, err
}
return int64(n), fileutil.Replace(tmp, path)
}
func (w *StatefulWatcher) deleteProgressFile() error {
return os.Remove(w.progressMarkerFilePath())
}
type segmentsRange struct {
firstSegment int
currentSegment int
lastSegment int
}
func (sr *segmentsRange) String() string {
return fmt.Sprintf("range: %08d to %08d (current: %08d)", sr.firstSegment, sr.lastSegment, sr.currentSegment)
}
func (sr *segmentsRange) isUnknown() bool {
return sr.firstSegment == unknownSegment || sr.currentSegment == unknownSegment || sr.lastSegment == unknownSegment
}
// StatefulWatcher watches the TSDB WAL for a given WriteTo.
// Unlike Watcher, it keeps track of its progress and resumes where it left off.
type StatefulWatcher struct {
*Watcher
// seriesOnly indicates if the watcher is currently only processing series samples
// to build its state of the world, or if it has started forwarding data as well.
seriesOnly bool
progressMarker *ProgressMarker
segmentsState *segmentsRange
}
// NewStatefulWatcher creates a new stateful WAL watcher for a given WriteTo.
func NewStatefulWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *StatefulWatcher {
if name == "" {
panic("name should be set as it's used as an indentifier.")
}
w := NewWatcher(metrics, readerMetrics, logger, name, writer, dir, sendExemplars, sendHistograms, sendMetadata)
return &StatefulWatcher{
Watcher: w,
progressMarker: &ProgressMarker{
SegmentIndex: unknownSegment,
},
segmentsState: &segmentsRange{
firstSegment: unknownSegment,
currentSegment: unknownSegment,
lastSegment: unknownSegment,
},
}
}
func (w *StatefulWatcher) SetStartTime(t time.Time) {
//
}
// Start the Watcher.
func (w *StatefulWatcher) Start() {
w.setMetrics()
// Look for previous persisted progress marker.
err := w.readProgressFile()
switch {
case err != nil && os.IsNotExist(err):
level.Debug(w.logger).Log(
"msg", "previous progress marker was not found",
"name", w.name,
)
case err != nil:
level.Warn(w.logger).Log(
"msg", "progress marker file could not be read, it will be reset",
"file", w.progressMarkerFilePath(),
)
}
// create the corresponding file as soon as possible for better visibility.
_, err = w.writeProgressFile()
if err != nil {
// TODO
panic("Return error?")
}
level.Info(w.logger).Log("msg", "Starting WAL watcher")
go w.loop()
}
// Stop the Watcher.
func (w *StatefulWatcher) Stop() {
close(w.quit)
<-w.done
// Persist the progressMarker marker.
_, err := w.writeProgressFile()
if err != nil {
// TODO
panic("Return error?")
}
// Records read metric has series and samples.
if w.metrics != nil {
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name)
// TODO: add marker progress related metrics, segment and offset
}
level.Info(w.logger).Log("msg", "WAL watcher stopped", "name", w.name)
}
func (w *StatefulWatcher) PurgeState() {
<-w.done
if err := w.deleteProgressFile(); err != nil {
level.Error(w.logger).Log("msg", "Failed to delete the progress marker file", "file", w.progressMarkerFilePath())
}
}
func (w *StatefulWatcher) loop() {
defer close(w.done)
// We may encounter failures processing the WAL; we should wait and retry.
// TODO: should we increment the current segment,just skip the corrupted record or just keep failling?
for !isClosed(w.quit) {
if err := w.Run(); err != nil {
level.Error(w.logger).Log("msg", "Error reading WAL", "err", err)
}
select {
case <-w.quit:
return
case <-time.After(5 * time.Second):
}
}
}
// syncOnIteration reacts to changes in the WAL segments (segment addition, deletion) to ensure appropriate segments are considered.
// It ensures that on each iteration:
// firstSegment <= w.progressMarker.segmentIndex <= lastSegment
// and
// firstSegment <= currentSegment <= lastSegment.
// When currentSegment is adjusted, the appropriate Checkpoint is read if available.
func (w *StatefulWatcher) syncOnIteration() error {
// Look for the last segment.
_, lastSegment, err := Segments(w.walDir)
if err != nil {
return fmt.Errorf("Segments: %w", err)
}
// Look for the first segment after the last checkpoint.
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
if err != nil && !errors.Is(err, record.ErrNotFound) {
return fmt.Errorf("LastCheckpoint: %w", err)
}
// Decision is made later whether the Checkpoint needs to be read.
checkpointAvailable := err == nil
firstSegment, err := w.findSegmentForIndex(checkpointIndex)
if err != nil {
return err
}
level.Debug(w.logger).Log(
"msg", "syncOnIteration",
"prevSegmentsState", w.segmentsState,
"prevProgressMarker", w.progressMarker,
"firstSegment", firstSegment,
"lastSegment", lastSegment,
"lastCheckpoint", lastCheckpoint,
"checkpointIndex", checkpointIndex,
)
// Segments are identified by their names, if nothing changed, return early.
if firstSegment == w.segmentsState.firstSegment && lastSegment >= w.segmentsState.lastSegment {
w.segmentsState.lastSegment = lastSegment
return nil
}
prevProgressMarker := *(w.progressMarker)
prevSegmentsState := *(w.segmentsState)
readCheckpoint := func() error {
// If the current segment is changed, reading the Checkpoint is needed.
if !checkpointAvailable || w.segmentsState.currentSegment == prevSegmentsState.currentSegment {
return nil
}
if err = w.readCheckpoint(lastCheckpoint, checkpointIndex, (*StatefulWatcher).readRecords); err != nil {
return fmt.Errorf("readCheckpoint: %w", err)
}
w.lastCheckpoint = lastCheckpoint
return nil
}
// First iteration.
if w.segmentsState.isUnknown() {
switch {
case w.progressMarker.isUnknown():
// start from the last segment if progress marker is unknown.
w.setProgress(lastSegment, 0)
case prevProgressMarker.SegmentIndex < firstSegment:
// M
// [* * * *
// ^
// M'
w.setProgress(firstSegment, 0)
case prevProgressMarker.SegmentIndex > lastSegment:
// M
// * * * *]
// ^
// M'
w.setProgress(lastSegment, 0)
}
w.setSegmentsState(firstSegment, firstSegment, lastSegment)
return readCheckpoint()
}
// Adjust on the first segment changes.
if firstSegment < prevSegmentsState.firstSegment {
// [* * * * [* * * *
// ^ ^
// F F
// [* * * * * [* * * * * * * * *
// ^ ^
// F'=C'=M' F'=C'=M'
level.Info(w.logger).Log(
"msg", "Segments were added before the previous first segment",
"prevSegmentsState", &prevSegmentsState,
"firstSegment", firstSegment,
)
w.setProgress(firstSegment, 0)
w.segmentsState.currentSegment = firstSegment
}
if firstSegment > prevSegmentsState.firstSegment {
if firstSegment <= prevProgressMarker.SegmentIndex {
// [* * * * * [* * * * *
// ^ ^
// M M
// [* * * * [* * *
// ^ ^
// M' M'
level.Debug(w.logger).Log(
"msg", "Segments whose data was read and forwarded were removed",
"prevSegmentsState", &prevSegmentsState,
"prevProgressMarker", &prevProgressMarker,
"firstSegment", firstSegment,
)
} else {
// [* * * * * * * [* * * * *
// ^ ^
// M M
// [* * * * [* * * *
// ^ ^
// M' M'
w.setProgress(firstSegment, 0)
level.Warn(w.logger).Log(
"msg", "Segments were removed before the watcher ensured all data in them was read and forwarded",
"prevSegmentsState", &prevSegmentsState,
"prevProgressMarker", &prevProgressMarker,
"firstSegment", firstSegment,
)
}
if firstSegment <= prevSegmentsState.currentSegment {
// [* * * * * [* * * * * *
// ^ ^
// C C
// [* * * * [* * * *
// ^ ^
// C' C'
level.Debug(w.logger).Log(
"msg", "Segments that were already replayed were removed",
"prevSegmentsState", &prevSegmentsState,
"firstSegment", firstSegment,
)
} else {
// [* * * * * * * [* * * * * *
// ^ ^
// C C
// [* * * [* * * * *
// ^ ^
// C' C'
w.segmentsState.currentSegment = firstSegment
level.Warn(w.logger).Log(
"msg", "Segments were removed before they could be read",
"prevSegmentsState", &prevSegmentsState,
"firstSegment", firstSegment,
)
}
}
if firstSegment > prevSegmentsState.lastSegment {
// * * * * *]
// ^
// L
// [* * * *
// ^
// F'
level.Debug(w.logger).Log(
"msg", "All segments are subsequent to the previous last segment",
"prevSegmentsState", &prevSegmentsState,
"firstSegment", firstSegment,
)
}
// Adjust on last segment changes.
if lastSegment > prevSegmentsState.lastSegment {
// * * * * *]
// ^
// F
// * * * * * * *]
// ^
// F'
level.Debug(w.logger).Log(
"msg", "New segments were added after the previous last segment",
"prevSegmentsState", &prevSegmentsState,
"lastSegment", lastSegment,
)
}
if lastSegment < prevSegmentsState.lastSegment {
if lastSegment >= prevProgressMarker.SegmentIndex {
// * * * * *] * * * * *]
// ^ ^
// M M
// * * * *] * * * *]
// ^ ^
// M' M'
level.Warn(w.logger).Log(
"msg", "Segments were removed before the watcher ensured all data in them was read and forwarded",
"prevSegmentsState", &prevSegmentsState,
"prevProgressMarker", &prevProgressMarker,
"lastSegment", lastSegment,
)
} else {
if firstSegment >= prevSegmentsState.firstSegment {
// [* * * * * * *]
// ^
// M
// [* * * *]
// ^
// M'
w.setProgress(lastSegment, 0)
}
level.Debug(w.logger).Log(
"msg", "The progress marker points to a segment beyond the last segment",
"prevSegmentsState", &prevSegmentsState,
"prevProgressMarker", &prevProgressMarker,
"lastSegment", lastSegment,
)
}
if lastSegment >= prevSegmentsState.currentSegment {
// * * * * *] * * * * *]
// ^ ^
// C C
// * * * *] * * * *]
// ^ ^
// C' C'
level.Warn(w.logger).Log(
"msg", "Segments were removed before they could be read",
"prevSegmentsState", &prevSegmentsState,
"lastSegment", lastSegment,
)
} else {
if firstSegment >= prevSegmentsState.firstSegment {
// [* * * * * * *]
// ^
// C
// [* * * *]
// ^
// C'
w.segmentsState.currentSegment = lastSegment
}
level.Debug(w.logger).Log(
"msg", "The segment to read is beyond the last segment",
"prevSegmentsState", &prevSegmentsState,
"prevProgressMarker", &prevProgressMarker,
"lastSegment", lastSegment,
)
}
}
if lastSegment < prevSegmentsState.firstSegment {
// [* * * * *
// ^
// F
// * * * * *]
// ^
// L'
level.Debug(w.logger).Log(
"msg", "All segments are prior to the previous first segment",
"prevSegmentsState", &prevSegmentsState,
"lastSegment", lastSegment,
)
}
w.setSegmentsState(firstSegment, w.segmentsState.currentSegment, lastSegment)
return readCheckpoint()
}
// Run the watcher, which will tail the last WAL segment until the quit channel is closed
// or an error case is hit.
func (w *StatefulWatcher) Run() error {
level.Info(w.logger).Log("msg", "Replaying WAL")
for !isClosed(w.quit) {
// The watcher will decide when to start forwarding data while reading the segment.
w.seriesOnly = true
if err := w.syncOnIteration(); err != nil {
return err
}
w.currentSegmentMetric.Set(float64(w.segmentsState.currentSegment))
switch {
case w.progressMarker.SegmentIndex <= w.segmentsState.currentSegment && w.segmentsState.currentSegment < w.segmentsState.lastSegment:
level.Debug(w.logger).Log(
"msg", "Reading a segment subsequent or equal to where the watcher left off",
"segmentsState", w.segmentsState,
"progressMarker", w.progressMarker,
)
if err := w.readSegmentStrict(w.walDir, w.segmentsState.currentSegment, (*StatefulWatcher).readRecords); err != nil {
return err
}
case w.segmentsState.currentSegment < w.progressMarker.SegmentIndex:
level.Debug(w.logger).Log(
"msg", "Reading a segment prior to where the watcher left off",
"segmentsState", w.segmentsState,
"progressMarker", w.progressMarker,
)
w.readSegmentIgnoreErrors(w.walDir, w.segmentsState.currentSegment, (*StatefulWatcher).readRecords)
default:
level.Debug(w.logger).Log(
"msg", "Reading and tailing the last segment",
"segmentsState", w.segmentsState,
"progressMarker", w.progressMarker,
)
if err := w.watch(w.segmentsState.currentSegment); err != nil {
return err
}
}
// For testing: stop when you hit a specific segment.
if w.segmentsState.currentSegment == w.MaxSegment {
return nil
}
w.segmentsState.currentSegment++
}
return nil
}
func (w *StatefulWatcher) watch(segmentIndex int) error {
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentIndex))
if err != nil {
return err
}
defer segment.Close()
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
// Don't wait for tickers to process records that are already in the segment.
err = w.readRecordsIgnoreEOF(reader, segmentIndex)
if err != nil {
return err
}
checkpointTicker := time.NewTicker(w.checkpointPeriod)
defer checkpointTicker.Stop()
segmentTicker := time.NewTicker(w.segmentCheckPeriod)
defer segmentTicker.Stop()
readTicker := time.NewTicker(w.readTimeout)
defer readTicker.Stop()
gcSem := make(chan struct{}, 1)
for {
select {
case <-w.quit:
return nil
case <-checkpointTicker.C:
// Periodically check if there is a new checkpoint so we can garbage
// collect labels. As this is considered an optimisation, we ignore
// errors during checkpoint processing. Doing the process asynchronously
// allows the current WAL segment to be processed while reading the
// checkpoint.
select {
case gcSem <- struct{}{}:
go func() {
defer func() {
<-gcSem
}()
if err := w.garbageCollectSeries(segmentIndex); err != nil {
level.Warn(w.logger).Log("msg", "Error process checkpoint", "err", err)
}
}()
default:
// Currently doing a garbage collection, try again later.
}
// if a newer segment is produced, read the current one until the end and move on.
case <-segmentTicker.C:
_, last, err := Segments(w.walDir)
if err != nil {
return fmt.Errorf("Segments: %w", err)
}
if last > segmentIndex {
return w.readRecordsIgnoreEOF(reader, segmentIndex)
}
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C:
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", w.readTimeout)
err := w.readRecordsIgnoreEOF(reader, segmentIndex)
if err != nil {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(w.readTimeout)
case <-w.readNotify:
err := w.readRecordsIgnoreEOF(reader, segmentIndex)
if err != nil {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(w.readTimeout)
}
}
}
type recordsReadFn func(w *StatefulWatcher, r *LiveReader, segmentIndex int) error
// readSegmentStrict reads the segment and returns an error if the segment was not read completely.
func (w *StatefulWatcher) readSegmentStrict(segmentDir string, segmentIndex int, readFn recordsReadFn) error {
segment, err := OpenReadSegment(SegmentName(segmentDir, segmentIndex))
if err != nil {
return fmt.Errorf("unable to open segment: %w", err)
}
defer segment.Close()
size, err := getSegmentSize(segmentDir, segmentIndex)
if err != nil {
return fmt.Errorf("getSegmentSize: %w", err)
}
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
err = readFn(w, reader, segmentIndex)
if errors.Is(err, io.EOF) {
if reader.Offset() == size {
return nil
}
return fmt.Errorf("readSegmentStrict wasn't able to read all data from the segment %08d, size: %d, totalRead: %d", segmentIndex, size, reader.Offset())
}
return err
}
// readSegmentIgnoreErrors reads the segment, it logs and ignores all eventuels errors.
// TODO: ducument why we need this, https://github.com/prometheus/prometheus/pull/5214/files
func (w *StatefulWatcher) readSegmentIgnoreErrors(segmentDir string, segmentIndex int, readFn recordsReadFn) {
err := w.readSegmentStrict(segmentDir, segmentIndex, readFn)
if err != nil {
level.Warn(w.logger).Log("msg", "Ignoring error reading the segment, may have dropped data", "segment", segmentIndex, "err", err)
}
}
func (w *StatefulWatcher) checkProgressMarkerReached(segmentIndex int, offset int64) {
if !w.seriesOnly {
return
}
// Check if we arrived back where we left off.
// offset > w.progressMarker.Offset as the segment may have been altered and replaced, which is not detected nor supported.
if segmentIndex > w.progressMarker.SegmentIndex || (segmentIndex == w.progressMarker.SegmentIndex && offset > w.progressMarker.Offset) {
// make the watcher start forwarding data.
w.seriesOnly = false
}
}
func (w *StatefulWatcher) setSegmentsState(firstSegment, currentSegment, lastSegment int) {
w.segmentsState.firstSegment = firstSegment
w.segmentsState.currentSegment = currentSegment
w.segmentsState.lastSegment = lastSegment
}
func (w *StatefulWatcher) setProgress(segmentIndex int, offset int64) {
w.progressMarker.SegmentIndex = segmentIndex
w.progressMarker.Offset = offset
}
func (w *StatefulWatcher) UpdateProgress(segmentIndex int, offset int64) {
// The watcher isn't forwarding data yet.
if w.seriesOnly {
return
}
w.setProgress(segmentIndex, offset)
}
// Read records and pass the details to w.writer.
// Also used with readCheckpoint - implements recordsReadFn.
func (w *StatefulWatcher) readRecords(r *LiveReader, segmentIndex int) error {
var (
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
series []record.RefSeries
samples []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
metadata []record.RefMetadata
)
for r.Next() && !isClosed(w.quit) {
w.checkProgressMarkerReached(segmentIndex, r.Offset())
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreSeries(series, segmentIndex)
case record.Samples:
if w.seriesOnly {
break
}
samples, err := dec.Samples(rec, samples[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
if len(samples) > 0 {
w.writer.Append(samples)
}
case record.Exemplars:
// Skip if experimental "exemplars over remote write" is not enabled.
if !w.sendExemplars || w.seriesOnly {
break
}
exemplars, err := dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
if len(exemplars) > 0 {
w.writer.AppendExemplars(exemplars)
}
case record.HistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms || w.seriesOnly {
break
}
histograms, err := dec.HistogramSamples(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
if len(histograms) > 0 {
w.writer.AppendHistograms(histograms)
}
case record.FloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms || w.seriesOnly {
break
}
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
if len(floatHistograms) > 0 {
w.writer.AppendFloatHistograms(floatHistograms)
}
case record.Metadata:
if !w.sendMetadata {
break
}
meta, err := dec.Metadata(rec, metadata[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreMetadata(meta)
case record.Tombstones:
default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
}
// TODO: check AppendXXX returned values.
// In the future, the shards will influence this.
w.UpdateProgress(segmentIndex, r.Offset())
}
if err := r.Err(); err != nil {
return fmt.Errorf("readRecods from segment %08d: %w", segmentIndex, err)
}
return nil
}
func (w *StatefulWatcher) readRecordsIgnoreEOF(r *LiveReader, segmentIndex int) error {
if err := w.readRecords(r, segmentIndex); err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
}
// Read all the segments from a Checkpoint directory.
func (w *StatefulWatcher) readCheckpoint(checkpointDir string, checkpointIndex int, readFn recordsReadFn) error {
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
// Records from these segments should be marked as originating from the Checkpoint index itself.
indexAwareReadFn := func(w *StatefulWatcher, r *LiveReader, _ int) error {
return readFn(w, r, checkpointIndex)
}
// Read all segments in the checkpoint dir.
segs, err := listSegments(checkpointDir)
if err != nil {
return fmt.Errorf("Unable to get segments in checkpoint dir: %w", err)
}
for _, segRef := range segs {
err := w.readSegmentStrict(checkpointDir, segRef.index, indexAwareReadFn)
if err != nil {
return fmt.Errorf("readSegmentStrict from the checkpoint %s: %w", checkpointDir, err)
}
}
level.Debug(w.logger).Log("msg", "Read series references from checkpoint", "checkpoint", checkpointDir)
return nil
}

File diff suppressed because it is too large Load diff

View file

@ -34,15 +34,21 @@ import (
)
const (
checkpointPeriod = 5 * time.Second
segmentCheckPeriod = 100 * time.Millisecond
consumer = "consumer"
consumer = "consumer"
)
var (
ErrIgnorable = errors.New("ignore me")
readTimeout = 15 * time.Second
)
var ErrIgnorable = errors.New("ignore me")
// WALWatcher allows chosing between Watcher and StatefulWatcher.
type WALWatcher interface {
// Used as an identifier for StatefulWatcher.
Name() string
Start()
Notify()
// Only makes sense for StatefulWatcher.
PurgeState()
Stop()
}
// WriteTo is an interface used by the Watcher to send the samples it's read
// from the WAL on to somewhere else. Functions will be called concurrently
@ -73,11 +79,10 @@ type WriteNotified interface {
}
type WatcherMetrics struct {
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
samplesSentPreTailing *prometheus.CounterVec
currentSegment *prometheus.GaugeVec
notificationsSkipped *prometheus.CounterVec
recordsRead *prometheus.CounterVec
recordDecodeFails *prometheus.CounterVec
currentSegment *prometheus.GaugeVec
notificationsSkipped *prometheus.CounterVec
}
// Watcher watches the TSDB WAL for a given WriteTo.
@ -99,7 +104,6 @@ type Watcher struct {
recordsReadMetric *prometheus.CounterVec
recordDecodeFailsMetric prometheus.Counter
samplesSentPreTailing prometheus.Counter
currentSegmentMetric prometheus.Gauge
notificationsSkipped prometheus.Counter
@ -107,6 +111,11 @@ type Watcher struct {
quit chan struct{}
done chan struct{}
// To ease their use in tests.
checkpointPeriod time.Duration
segmentCheckPeriod time.Duration
readTimeout time.Duration
// For testing, stop when we hit this segment.
MaxSegment int
}
@ -131,15 +140,6 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
},
[]string{consumer},
),
samplesSentPreTailing: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "wal_watcher",
Name: "samples_sent_pre_tailing_total",
Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.",
},
[]string{consumer},
),
currentSegment: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "prometheus",
@ -163,7 +163,6 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
if reg != nil {
reg.MustRegister(m.recordsRead)
reg.MustRegister(m.recordDecodeFails)
reg.MustRegister(m.samplesSentPreTailing)
reg.MustRegister(m.currentSegment)
reg.MustRegister(m.notificationsSkipped)
}
@ -191,10 +190,18 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge
quit: make(chan struct{}),
done: make(chan struct{}),
checkpointPeriod: 5 * time.Second,
segmentCheckPeriod: 100 * time.Millisecond,
readTimeout: 15 * time.Second,
MaxSegment: -1,
}
}
func (w *Watcher) Name() string {
return w.name
}
func (w *Watcher) Notify() {
select {
case w.readNotify <- struct{}{}:
@ -213,7 +220,6 @@ func (w *Watcher) setMetrics() {
if w.metrics != nil {
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
}
@ -237,13 +243,16 @@ func (w *Watcher) Stop() {
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name)
}
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
}
func (w *Watcher) PurgeState() {
level.Debug(w.logger).Log("msg", "Watcher has no state to purge")
}
func (w *Watcher) loop() {
defer close(w.done)
@ -375,13 +384,13 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
return w.readAndHandleError(reader, segmentNum, tail, size)
}
checkpointTicker := time.NewTicker(checkpointPeriod)
checkpointTicker := time.NewTicker(w.checkpointPeriod)
defer checkpointTicker.Stop()
segmentTicker := time.NewTicker(segmentCheckPeriod)
segmentTicker := time.NewTicker(w.segmentCheckPeriod)
defer segmentTicker.Stop()
readTicker := time.NewTicker(readTimeout)
readTicker := time.NewTicker(w.readTimeout)
defer readTicker.Stop()
gcSem := make(chan struct{}, 1)
@ -420,17 +429,16 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
if last > segmentNum {
return w.readAndHandleError(reader, segmentNum, tail, size)
}
continue
// we haven't read due to a notification in quite some time, try reading anyways
case <-readTicker.C:
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", w.readTimeout)
err := w.readAndHandleError(reader, segmentNum, tail, size)
if err != nil {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
readTicker.Reset(w.readTimeout)
case <-w.readNotify:
err := w.readAndHandleError(reader, segmentNum, tail, size)
@ -438,7 +446,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
return err
}
// reset the ticker so we don't read too often
readTicker.Reset(readTimeout)
readTicker.Reset(w.readTimeout)
}
}
}
@ -452,9 +460,10 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
if dir == "" || dir == w.lastCheckpoint {
return nil
}
// TODO: shouldn't we set this later, after readCheckpoint??
w.lastCheckpoint = dir
index, err := checkpointNum(dir)
index, err := checkpointIndex(dir)
if err != nil {
return fmt.Errorf("error parsing checkpoint filename: %w", err)
}
@ -470,7 +479,7 @@ func (w *Watcher) garbageCollectSeries(segmentNum int) error {
return fmt.Errorf("readCheckpoint: %w", err)
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
// Clear series with a checkpoint or segment index lower than the checkpoint index we just read.
w.writer.SeriesReset(index)
return nil
}
@ -671,9 +680,9 @@ type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) er
// Read all the series records from a Checkpoint directory.
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
index, err := checkpointNum(checkpointDir)
index, err := checkpointIndex(checkpointDir)
if err != nil {
return fmt.Errorf("checkpointNum: %w", err)
return fmt.Errorf("checkpointIndex: %w", err)
}
// Ensure we read the whole contents of every segment in the checkpoint dir.
@ -708,7 +717,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) err
return nil
}
func checkpointNum(dir string) (int, error) {
func checkpointIndex(dir string) (int, error) {
// Checkpoint dir names are in the format checkpoint.000001
// dir may contain a hidden directory, so only check the base directory
chunks := strings.Split(filepath.Base(dir), ".")

View file

@ -17,7 +17,6 @@ import (
"math/rand"
"os"
"path"
"runtime"
"sync"
"testing"
"time"
@ -54,9 +53,13 @@ func retry(t *testing.T, interval time.Duration, n int, f func() bool) {
type writeToMock struct {
samplesAppended int
samplesLock sync.Mutex
exemplarsAppended int
exemplarsLock sync.Mutex
histogramsAppended int
histogramsLock sync.Mutex
floatHistogramsAppended int
floatHistogramsLock sync.Mutex
seriesLock sync.Mutex
seriesSegmentIndexes map[chunks.HeadSeriesRef]int
@ -65,24 +68,32 @@ type writeToMock struct {
}
func (wtm *writeToMock) Append(s []record.RefSample) bool {
wtm.samplesLock.Lock()
defer wtm.samplesLock.Unlock()
time.Sleep(wtm.delay)
wtm.samplesAppended += len(s)
return true
}
func (wtm *writeToMock) AppendExemplars(e []record.RefExemplar) bool {
wtm.exemplarsLock.Lock()
defer wtm.exemplarsLock.Unlock()
time.Sleep(wtm.delay)
wtm.exemplarsAppended += len(e)
return true
}
func (wtm *writeToMock) AppendHistograms(h []record.RefHistogramSample) bool {
wtm.histogramsLock.Lock()
defer wtm.histogramsLock.Unlock()
time.Sleep(wtm.delay)
wtm.histogramsAppended += len(h)
return true
}
func (wtm *writeToMock) AppendFloatHistograms(fh []record.RefFloatHistogramSample) bool {
wtm.floatHistogramsLock.Lock()
defer wtm.floatHistogramsLock.Unlock()
time.Sleep(wtm.delay)
wtm.floatHistogramsAppended += len(fh)
return true
@ -115,12 +126,36 @@ func (wtm *writeToMock) SeriesReset(index int) {
}
}
func (wtm *writeToMock) checkNumSeries() int {
func (wtm *writeToMock) seriesCount() int {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
return len(wtm.seriesSegmentIndexes)
}
func (wtm *writeToMock) samplesCount() int {
wtm.samplesLock.Lock()
defer wtm.samplesLock.Unlock()
return wtm.samplesAppended
}
func (wtm *writeToMock) examplarsCount() int {
wtm.exemplarsLock.Lock()
defer wtm.exemplarsLock.Unlock()
return wtm.exemplarsAppended
}
func (wtm *writeToMock) histogramsCount() int {
wtm.histogramsLock.Lock()
defer wtm.histogramsLock.Unlock()
return wtm.histogramsAppended
}
func (wtm *writeToMock) floatHistogramsCount() int {
wtm.floatHistogramsLock.Lock()
defer wtm.floatHistogramsLock.Unlock()
return wtm.floatHistogramsAppended
}
func newWriteToMock(delay time.Duration) *writeToMock {
return &writeToMock{
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
@ -129,6 +164,8 @@ func newWriteToMock(delay time.Duration) *writeToMock {
}
func TestTailSamples(t *testing.T) {
t.Parallel()
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
@ -136,6 +173,8 @@ func TestTailSamples(t *testing.T) {
const histogramsCount = 50
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
t.Parallel()
now := time.Now()
dir := t.TempDir()
@ -242,24 +281,28 @@ func TestTailSamples(t *testing.T) {
expectedExemplars := seriesCount * exemplarsCount
expectedHistograms := seriesCount * histogramsCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expectedSeries
return wt.seriesCount() >= expectedSeries
})
require.Equal(t, expectedSeries, wt.checkNumSeries(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesAppended, "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.exemplarsAppended, "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsAppended, "did not receive the expected number of histograms")
require.Equal(t, expectedHistograms, wt.floatHistogramsAppended, "did not receive the expected number of float histograms")
require.Equal(t, expectedSeries, wt.seriesCount(), "did not receive the expected number of series")
require.Equal(t, expectedSamples, wt.samplesCount(), "did not receive the expected number of samples")
require.Equal(t, expectedExemplars, wt.examplarsCount(), "did not receive the expected number of exemplars")
require.Equal(t, expectedHistograms, wt.histogramsCount(), "did not receive the expected number of histograms")
require.Equal(t, expectedHistograms, wt.floatHistogramsCount(), "did not receive the expected number of float histograms")
})
}
}
func TestReadToEndNoCheckpoint(t *testing.T) {
t.Parallel()
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
@ -302,24 +345,27 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
}
}
require.NoError(t, w.Log(recs...))
readTimeout = time.Second
_, _, err = Segments(w.Dir())
require.NoError(t, err)
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.readTimeout = time.Second
go watcher.Start()
expected := seriesCount
require.Eventually(t, func() bool {
return wt.checkNumSeries() == expected
return wt.seriesCount() == expected
}, 20*time.Second, 1*time.Second)
watcher.Stop()
})
}
}
// TestReadToEndWithCheckpoint.
func TestReadToEndWithCheckpoint(t *testing.T) {
t.Parallel()
segmentSize := 32 * 1024
// We need something similar to this # of series and samples
// in order to get enough segments for us to checkpoint.
@ -328,6 +374,8 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -394,15 +442,15 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.readTimeout = time.Second
go watcher.Start()
expected := seriesCount * 2
require.Eventually(t, func() bool {
return wt.checkNumSeries() == expected
return wt.seriesCount() == expected
}, 10*time.Second, 1*time.Second)
watcher.Stop()
})
@ -410,12 +458,16 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
}
func TestReadCheckpoint(t *testing.T) {
t.Parallel()
pageSize := 32 * 1024
const seriesCount = 10
const samplesCount = 250
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -472,15 +524,17 @@ func TestReadCheckpoint(t *testing.T) {
expectedSeries := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expectedSeries
return wt.seriesCount() >= expectedSeries
})
watcher.Stop()
require.Equal(t, expectedSeries, wt.checkNumSeries())
require.Equal(t, expectedSeries, wt.seriesCount())
})
}
}
func TestReadCheckpointMultipleSegments(t *testing.T) {
t.Parallel()
pageSize := 32 * 1024
const segments = 1
@ -489,6 +543,8 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -552,6 +608,8 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
}
func TestCheckpointSeriesReset(t *testing.T) {
t.Parallel()
segmentSize := 32 * 1024
// We need something similar to this # of series and samples
// in order to get enough segments for us to checkpoint.
@ -607,18 +665,18 @@ func TestCheckpointSeriesReset(t *testing.T) {
_, _, err = Segments(w.Dir())
require.NoError(t, err)
readTimeout = time.Second
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.readTimeout = time.Second
watcher.MaxSegment = -1
go watcher.Start()
expected := seriesCount
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() >= expected
return wt.seriesCount() >= expected
})
require.Eventually(t, func() bool {
return wt.checkNumSeries() == seriesCount
return wt.seriesCount() == seriesCount
}, 10*time.Second, 1*time.Second)
_, err = Checkpoint(log.NewNopLogger(), w, 2, 4, func(x chunks.HeadSeriesRef) bool { return true }, 0)
@ -637,13 +695,16 @@ func TestCheckpointSeriesReset(t *testing.T) {
// many series records you end up with and change the last Equals check accordingly
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
require.Eventually(t, func() bool {
return wt.checkNumSeries() == tc.segments
return wt.seriesCount() == tc.segments
}, 20*time.Second, 1*time.Second)
})
}
}
// TestRun_StartupTime ensures that on startup, the watcher doesnt rely on readTimeout to read existing segments.
func TestRun_StartupTime(t *testing.T) {
t.Parallel()
const pageSize = 32 * 1024
const segments = 10
const seriesCount = 20
@ -651,6 +712,8 @@ func TestRun_StartupTime(t *testing.T) {
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
wdir := path.Join(dir, "wal")
@ -695,7 +758,7 @@ func TestRun_StartupTime(t *testing.T) {
startTime := time.Now()
err = watcher.Run()
require.Less(t, time.Since(startTime), readTimeout)
require.Less(t, time.Since(startTime), watcher.readTimeout)
require.NoError(t, err)
})
}
@ -732,60 +795,4 @@ func generateWALRecords(w *WL, segment, seriesCount, samplesCount int) error {
return nil
}
func TestRun_AvoidNotifyWhenBehind(t *testing.T) {
if runtime.GOOS == "windows" { // Takes a really long time, perhaps because min sleep time is 15ms.
t.SkipNow()
}
const segmentSize = pageSize // Smallest allowed segment size.
const segmentsToWrite = 5
const segmentsToRead = segmentsToWrite - 1
const seriesCount = 10
const samplesCount = 50
// This test can take longer than intended to finish in cloud CI.
readTimeout := 10 * time.Second
for _, compress := range []CompressionType{CompressionNone, CompressionSnappy, CompressionZstd} {
t.Run(string(compress), func(t *testing.T) {
dir := t.TempDir()
wdir := path.Join(dir, "wal")
err := os.Mkdir(wdir, 0o777)
require.NoError(t, err)
w, err := NewSize(nil, nil, wdir, segmentSize, compress)
require.NoError(t, err)
var wg sync.WaitGroup
// Generate one segment initially to ensure that watcher.Run() finds at least one segment on disk.
require.NoError(t, generateWALRecords(w, 0, seriesCount, samplesCount))
w.NextSegment() // Force creation of the next segment
wg.Add(1)
go func() {
defer wg.Done()
for i := 1; i < segmentsToWrite; i++ {
require.NoError(t, generateWALRecords(w, i, seriesCount, samplesCount))
w.NextSegment()
}
}()
wt := newWriteToMock(time.Millisecond)
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false)
watcher.MaxSegment = segmentsToRead
watcher.setMetrics()
startTime := time.Now()
err = watcher.Run()
wg.Wait()
require.Less(t, time.Since(startTime), readTimeout)
// But samples records shouldn't get dropped
retry(t, defaultRetryInterval, defaultRetries, func() bool {
return wt.checkNumSeries() > 0
})
require.Equal(t, segmentsToRead*seriesCount*samplesCount, wt.samplesAppended)
require.NoError(t, err)
require.NoError(t, w.Close())
})
}
}
// TODO: add TestRun_AvoidNotifyWhenBehind back

View file

@ -486,7 +486,7 @@ func TestEndpoints(t *testing.T) {
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil
}, dbDir, 1*time.Second, nil, false)
}, dbDir, 1*time.Second, nil, false, false)
err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{