Fix bug with WAL watcher and Live Reader metrics usage. (#6998)

* Fix bug with WAL watcher and Live Reader metrics usage.

Calling NewXMetrics when creating a Watcher or LiveReader results in a
registration error, which we're ignoring, and as a result other than the
first Watcher/Reader created, we had no metrics for either. So we would
only have metrics like Watcher Records Read for the first remote write
config in a users config file.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2020-03-20 09:34:15 -07:00 committed by GitHub
parent 7920305e4e
commit f802f1e8ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 55 additions and 49 deletions

View file

@ -271,7 +271,7 @@ type QueueManager struct {
} }
// NewQueueManager builds a new QueueManager. // NewQueueManager builds a new QueueManager.
func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { func NewQueueManager(metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -301,7 +301,7 @@ func NewQueueManager(reg prometheus.Registerer, metrics *queueManagerMetrics, lo
metrics: metrics, metrics: metrics,
} }
t.watcher = wal.NewWatcher(reg, wal.NewWatcherMetrics(reg), logger, client.Name(), t, walDir) t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir)
t.shards = t.newShards() t.shards = t.newShards()
return t return t

View file

@ -61,7 +61,7 @@ func TestSampleDelivery(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -90,7 +90,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
defer m.Stop() defer m.Stop()
@ -131,7 +131,7 @@ func TestSampleDeliveryOrder(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -150,7 +150,8 @@ func TestShutdown(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n) samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
@ -188,7 +189,7 @@ func TestSeriesReset(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
for i := 0; i < numSegments; i++ { for i := 0; i < numSegments; i++ {
series := []record.RefSeries{} series := []record.RefSeries{}
for j := 0; j < numSeries; j++ { for j := 0; j < numSeries; j++ {
@ -218,7 +219,7 @@ func TestReshard(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -251,7 +252,7 @@ func TestReshardRaceWithStop(t *testing.T) {
go func() { go func() {
for { for {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m = NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
h.Unlock() h.Unlock()
h.Lock() h.Lock()
@ -269,7 +270,7 @@ func TestReshardRaceWithStop(t *testing.T) {
func TestReleaseNoninternedString(t *testing.T) { func TestReleaseNoninternedString(t *testing.T) {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
c := NewTestStorageClient() c := NewTestStorageClient()
m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start() m.Start()
for i := 1; i < 1000; i++ { for i := 1; i < 1000; i++ {
@ -316,7 +317,7 @@ func TestCalculateDesiredsShards(t *testing.T) {
for _, c := range cases { for _, c := range cases {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
client := NewTestStorageClient() client := NewTestStorageClient()
m := NewQueueManager(nil, metrics, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline)
m.numShards = c.startingShards m.numShards = c.startingShards
m.samplesIn.incr(c.samplesIn) m.samplesIn.incr(c.samplesIn)
m.samplesOut.incr(c.samplesOut) m.samplesOut.incr(c.samplesOut)
@ -527,7 +528,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
// These should be received by the client. // These should be received by the client.
@ -569,7 +570,7 @@ func BenchmarkStartup(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
c := NewTestBlockedStorageClient() c := NewTestBlockedStorageClient()
m := NewQueueManager(nil, metrics, logger, dir, m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration), newEWMARate(ewmaWeight, shardUpdateDuration),
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
@ -620,7 +621,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(nil, metrics, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline) m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline)
// Need to start the queue manager so the proper metrics are initialized. // Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual // However we can stop it right away since we don't need to do any actual

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/wal"
) )
var ( var (
@ -46,11 +47,12 @@ var (
// WriteStorage represents all the remote write storage. // WriteStorage represents all the remote write storage.
type WriteStorage struct { type WriteStorage struct {
reg prometheus.Registerer
logger log.Logger logger log.Logger
mtx sync.Mutex mtx sync.Mutex
queueMetrics *queueManagerMetrics queueMetrics *queueManagerMetrics
watcherMetrics *wal.WatcherMetrics
liveReaderMetrics *wal.LiveReaderMetrics
configHash string configHash string
externalLabelHash string externalLabelHash string
walDir string walDir string
@ -66,8 +68,9 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
} }
rws := &WriteStorage{ rws := &WriteStorage{
queues: make(map[string]*QueueManager), queues: make(map[string]*QueueManager),
reg: reg,
queueMetrics: newQueueManagerMetrics(reg), queueMetrics: newQueueManagerMetrics(reg),
watcherMetrics: wal.NewWatcherMetrics(reg),
liveReaderMetrics: wal.NewLiveReaderMetrics(reg),
logger: logger, logger: logger,
flushDeadline: flushDeadline, flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
@ -152,8 +155,9 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
return err return err
} }
newQueues[hash] = NewQueueManager( newQueues[hash] = NewQueueManager(
rws.reg,
rws.queueMetrics, rws.queueMetrics,
rws.watcherMetrics,
rws.liveReaderMetrics,
rws.logger, rws.logger,
rws.walDir, rws.walDir,
rws.samplesIn, rws.samplesIn,

View file

@ -28,14 +28,14 @@ import (
) )
// liveReaderMetrics holds all metrics exposed by the LiveReader. // liveReaderMetrics holds all metrics exposed by the LiveReader.
type liveReaderMetrics struct { type LiveReaderMetrics struct {
readerCorruptionErrors *prometheus.CounterVec readerCorruptionErrors *prometheus.CounterVec
} }
// NewLiveReaderMetrics instantiates, registers and returns metrics to be injected // NewLiveReaderMetrics instantiates, registers and returns metrics to be injected
// at LiveReader instantiation. // at LiveReader instantiation.
func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics {
m := &liveReaderMetrics{ m := &LiveReaderMetrics{
readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors_total", Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
Help: "Errors encountered when reading the WAL.", Help: "Errors encountered when reading the WAL.",
@ -43,15 +43,14 @@ func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics {
} }
if reg != nil { if reg != nil {
// TODO(codesome): log error. reg.MustRegister(m.readerCorruptionErrors)
_ = reg.Register(m.readerCorruptionErrors)
} }
return m return m
} }
// NewLiveReader returns a new live reader. // NewLiveReader returns a new live reader.
func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader {
lr := &LiveReader{ lr := &LiveReader{
logger: logger, logger: logger,
rdr: r, rdr: r,
@ -89,7 +88,7 @@ type LiveReader struct {
// NB the non-ive Reader implementation allows for this. // NB the non-ive Reader implementation allows for this.
permissive bool permissive bool
metrics *liveReaderMetrics metrics *LiveReaderMetrics
} }
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal // Err returns any errors encountered reading the WAL. io.EOFs are not terminal

View file

@ -66,7 +66,7 @@ type Watcher struct {
walDir string walDir string
lastCheckpoint string lastCheckpoint string
metrics *WatcherMetrics metrics *WatcherMetrics
readerMetrics *liveReaderMetrics readerMetrics *LiveReaderMetrics
startTime time.Time startTime time.Time
startTimestamp int64 // the start time as a Prometheus timestamp startTimestamp int64 // the start time as a Prometheus timestamp
@ -125,17 +125,17 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
} }
if reg != nil { if reg != nil {
_ = reg.Register(m.recordsRead) reg.MustRegister(m.recordsRead)
_ = reg.Register(m.recordDecodeFails) reg.MustRegister(m.recordDecodeFails)
_ = reg.Register(m.samplesSentPreTailing) reg.MustRegister(m.samplesSentPreTailing)
_ = reg.Register(m.currentSegment) reg.MustRegister(m.currentSegment)
} }
return m return m
} }
// NewWatcher creates a new WAL watcher for a given WriteTo. // NewWatcher creates a new WAL watcher for a given WriteTo.
func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
@ -143,7 +143,7 @@ func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.L
logger: logger, logger: logger,
writer: writer, writer: writer,
metrics: metrics, metrics: metrics,
readerMetrics: NewLiveReaderMetrics(reg), readerMetrics: readerMetrics,
walDir: path.Join(walDir, "wal"), walDir: path.Join(walDir, "wal"),
name: name, name: name,
quit: make(chan struct{}), quit: make(chan struct{}),
@ -179,11 +179,13 @@ func (w *Watcher) Stop() {
<-w.done <-w.done
// Records read metric has series and samples. // Records read metric has series and samples.
if w.metrics != nil {
w.metrics.recordsRead.DeleteLabelValues(w.name, "series") w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
w.metrics.recordDecodeFails.DeleteLabelValues(w.name) w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
w.metrics.currentSegment.DeleteLabelValues(w.name) w.metrics.currentSegment.DeleteLabelValues(w.name)
}
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
} }

View file

@ -138,7 +138,7 @@ func TestTailSamples(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher.SetStartTime(now) watcher.SetStartTime(now)
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
@ -148,7 +148,7 @@ func TestTailSamples(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer segment.Close() defer segment.Close()
reader := NewLiveReader(nil, NewLiveReaderMetrics(prometheus.DefaultRegisterer), segment) reader := NewLiveReader(nil, NewLiveReaderMetrics(nil), segment)
// Use tail true so we can ensure we got the right number of samples. // Use tail true so we can ensure we got the right number of samples.
watcher.readSegment(reader, i, true) watcher.readSegment(reader, i, true)
} }
@ -217,7 +217,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
go watcher.Start() go watcher.Start()
expected := seriesCount expected := seriesCount
@ -303,7 +303,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
_, _, err = w.Segments() _, _, err = w.Segments()
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
go watcher.Start() go watcher.Start()
expected := seriesCount * 2 expected := seriesCount * 2
@ -368,7 +368,7 @@ func TestReadCheckpoint(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
go watcher.Start() go watcher.Start()
expectedSeries := seriesCount expectedSeries := seriesCount
@ -439,7 +439,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) {
} }
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
// Set the Watcher's metrics so they're not nil pointers. // Set the Watcher's metrics so they're not nil pointers.
@ -510,7 +510,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(nil, wMetrics, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
watcher.MaxSegment = -1 watcher.MaxSegment = -1
go watcher.Start() go watcher.Start()