Various improvements to WAL based remote write.

- Use the queue name in WAL watcher logging.
- Don't return from watch if the reader error was EOF.
- Fix sample timestamp check logic regarding what samples we send.
- Refactor so we don't need readToEnd/readSeriesRecords
- Fix wal_watcher tests since readToEnd no longer exists

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2019-01-18 12:31:36 -08:00 committed by Tom Wilkie
parent b93bafeee1
commit 37e35f9e0c
3 changed files with 202 additions and 178 deletions

View file

@ -268,6 +268,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high
sentBatchDuration.WithLabelValues(t.queueName)
succeededSamplesTotal.WithLabelValues(t.queueName)
failedSamplesTotal.WithLabelValues(t.queueName)
droppedSamplesTotal.WithLabelValues(t.queueName)
retriedSamplesTotal.WithLabelValues(t.queueName)
// Reset pending samples metric to 0.
t.pendingSamplesMetric.Set(0)

View file

@ -15,6 +15,8 @@ package remote
import (
"fmt"
"io"
"math"
"os"
"path"
"strconv"
@ -162,17 +164,18 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string
writer: writer,
walDir: path.Join(walDir, "wal"),
startTime: startTime,
name: name,
quit: make(chan struct{}),
}
w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(name)
w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(name)
w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(name)
w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(name)
w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(name)
w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(name)
w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(name)
w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(name)
w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.name)
w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(w.name)
w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(w.name)
w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(w.name)
w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(w.name)
w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.name)
w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.name)
w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.name)
return w
}
@ -214,23 +217,46 @@ func (w *WALWatcher) runWatcher() {
return
}
// Read series records in the current WAL and latest checkpoint, get the segment pointer back.
// TODO: callum, handle maintaining the WAL pointer somehow across apply configs?
segment, reader, err := w.readToEnd(w.walDir, first, last)
// Backfill from the checkpoint first if it exists.
dir, _, err := tsdb.LastCheckpoint(w.walDir)
if err != nil && err != tsdb.ErrNotFound {
level.Error(w.logger).Log("msg", "error looking for existing checkpoint, some samples may be dropped", "err", errors.Wrap(err, "find last checkpoint"))
}
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir)
if err == nil {
w.lastCheckpoint = dir
err = w.readCheckpoint(dir)
if err != nil {
level.Error(w.logger).Log("msg", "error reading existing checkpoint, some samples may be dropped", "err", err)
}
}
w.currentSegment = first
w.currentSegmentMetric.Set(float64(w.currentSegment))
segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment))
// TODO: callum, is this error really fatal?
if err != nil {
level.Error(w.logger).Log("err", err)
return
}
w.currentSegment = last
w.currentSegmentMetric.Set(float64(w.currentSegment))
reader := wal.NewLiveReader(segment)
tail := false
for {
level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment)
// If we've replayed the existing WAL, start tailing.
if w.currentSegment == last {
tail = true
}
if tail {
level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment)
} else {
level.Info(w.logger).Log("msg", "replaying segment", "segment", w.currentSegment)
}
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
err := w.watch(nw, reader)
err := w.watch(nw, reader, tail)
segment.Close()
if err != nil {
level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err)
@ -241,126 +267,19 @@ func (w *WALWatcher) runWatcher() {
w.currentSegmentMetric.Set(float64(w.currentSegment))
segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment))
reader = wal.NewLiveReader(segment)
// TODO: callum, is this error really fatal?
if err != nil {
level.Error(w.logger).Log("err", err)
return
}
reader = wal.NewLiveReader(segment)
}
}
// When starting the WAL watcher, there is potentially an existing WAL. In that case, we
// should read to the end of the newest existing segment before reading new records that
// are written to it, storing data from series records along the way.
// Unfortunately this function is duplicates some of TSDB Head.Init().
func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*wal.Segment, *wal.LiveReader, error) {
// Backfill from the checkpoint first if it exists.
defer level.Debug(w.logger).Log("msg", "done reading existing WAL")
dir, startFrom, err := tsdb.LastCheckpoint(walDir)
if err != nil && err != tsdb.ErrNotFound {
return nil, nil, errors.Wrap(err, "find last checkpoint")
}
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir)
if err == nil {
w.lastCheckpoint = dir
err = w.readCheckpoint(dir)
if err != nil {
return nil, nil, err
}
startFrom++
}
// Backfill segments from the last checkpoint onwards if at least 2 segments exist.
if lastSegment > 0 {
for i := firstSegment; i < lastSegment; i++ {
seg, err := wal.OpenReadSegment(wal.SegmentName(walDir, i))
if err != nil {
return nil, nil, err
}
sz, _ := getSegmentSize(walDir, i)
w.readSeriesRecords(wal.NewLiveReader(seg), i, sz)
}
}
// We want to start the WAL Watcher from the end of the last segment on start,
// so we make sure to return the wal.Segment pointer
segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, lastSegment))
if err != nil {
return nil, nil, err
}
r := wal.NewLiveReader(segment)
sz, _ := getSegmentSize(walDir, lastSegment)
w.readSeriesRecords(r, lastSegment, sz)
return segment, r, nil
}
// TODO: fix the exit logic for this function
// The stop param is used to stop at the end of the existing WAL on startup,
// since scraped samples may be written to the latest segment before we finish reading it.
func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) {
var (
dec tsdb.RecordDecoder
series []tsdb.RefSeries
samples []tsdb.RefSample
ret bool
)
for r.Next() && !isClosed(w.quit) {
series = series[:0]
rec := r.Record()
// If the timestamp is > start then we should Append this sample and exit readSeriesRecords,
// because this is the first sample written to the WAL after the WAL watcher was started.
typ := dec.Type(rec)
if typ == tsdb.RecordSamples {
samples, err := dec.Samples(rec, samples[:0])
if err != nil {
continue
}
for _, s := range samples {
if s.T > w.startTime {
w.writer.Append(samples)
ret = true
w.samplesSentPreTailing.Inc()
}
}
if ret {
level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start")
level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index)
return
}
}
if typ != tsdb.RecordSeries {
continue
}
series, err := dec.Series(rec, nil)
if err != nil {
level.Error(log.With(w.logger)).Log("err", err)
break
}
w.writer.StoreSeries(series, index)
}
// Since we only call readSeriesRecords on fully written WAL segments or checkpoints,
// Error() will only return an error if something actually went wrong when reading
// a record, either it was invalid or it was only partially written to the WAL.
if err := r.Err(); err != nil {
level.Error(w.logger).Log("err", err)
return
}
// Ensure we read all of the bytes in the segment or checkpoint.
if r.TotalRead() >= stop {
level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index)
return
}
}
func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error {
// Use tail true to indicate thatreader is currently on a segment that is
// actively being written to. If false, assume it's a full segment and we're
// replaying it on start to cache the series records.
func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader, tail bool) error {
readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
@ -370,6 +289,19 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error {
segmentTicker := time.NewTicker(segmentCheckPeriod)
defer segmentTicker.Stop()
// If we're replaying the segment we need to know the size of the file to know
// when to return from watch and move on to the next segment.
size := int64(math.MaxInt64)
if !tail {
segmentTicker.Stop()
checkpointTicker.Stop()
var err error
size, err = getSegmentSize(w.walDir, w.currentSegment)
if err != nil {
level.Error(w.logger).Log("msg", "error getting segment size", "segment", w.currentSegment)
return errors.Wrap(err, "get segment size")
}
}
for {
select {
@ -411,14 +343,16 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error {
if err != nil {
level.Error(w.logger).Log("err", err)
}
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
w.writer.SeriesReset(d)
case <-segmentTicker.C:
// check if new segments exist
_, last, err := wl.Segments()
if err != nil {
return errors.Wrap(err, "segments")
}
// Check if new segments exists.
if last <= w.currentSegment {
continue
}
@ -433,10 +367,14 @@ func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error {
return nil
case <-readTicker.C:
if err := w.readSegment(reader); err != nil {
if err := w.readSegment(reader); err != nil && err != io.EOF {
level.Error(w.logger).Log("err", err)
return err
}
if reader.TotalRead() >= size && !tail {
level.Info(w.logger).Log("msg", "done replaying segment", "segment", w.currentSegment, "size", size, "read", reader.TotalRead())
return nil
}
}
}
}
@ -475,9 +413,19 @@ func (w *WALWatcher) decodeRecord(rec []byte) error {
w.recordDecodeFailsMetric.Inc()
return err
}
w.samplesReadMetric.Add(float64(len(samples)))
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
w.writer.Append(samples)
var send []tsdb.RefSample
for _, s := range samples {
if s.T > w.startTime {
send = append(send, s)
}
}
if len(send) > 0 {
// We don't want to count samples read prior to the starting timestamp
// so that we can compare samples in vs samples read and succeeded samples.
w.samplesReadMetric.Add(float64(len(samples)))
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
w.writer.Append(send)
}
case tsdb.RecordTombstones:
w.tombstonesReadMetric.Add(float64(len(samples)))
@ -495,31 +443,26 @@ func (w *WALWatcher) decodeRecord(rec []byte) error {
// Read all the series records from a Checkpoint directory.
func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
level.Info(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir)
sr, err := wal.NewSegmentsReader(checkpointDir)
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer sr.Close()
split := strings.Split(checkpointDir, ".")
if len(split) != 2 {
return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir)
}
i, err := strconv.Atoi(split[1])
if err != nil {
i = w.currentSegment - 1
}
size, err := getCheckpointSize(checkpointDir)
if err != nil {
level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir)
return errors.Wrap(err, "get checkpoint size")
}
w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
// w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
r := wal.NewLiveReader(sr)
w.readSegment(r)
if r.TotalRead() != size {
level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint")
}
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
w.writer.SeriesReset(i)
return nil
}

View file

@ -18,6 +18,7 @@ import (
"math/rand"
"os"
"path"
"sync"
"testing"
"time"
@ -33,6 +34,7 @@ import (
type writeToMock struct {
samplesAppended int
seriesLabels map[uint64][]prompb.Label
seriesLock sync.Mutex
seriesSegmentIndexes map[uint64]int
}
@ -51,8 +53,8 @@ func (wtm *writeToMock) StoreSeries(series []tsdb.RefSeries, index int) {
temp[s.Ref] = labelsetToLabelsProto(ls)
}
// wtm.seriesMtx.Lock()
// defer t.seriesMtx.Unlock()
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
for ref, labels := range temp {
wtm.seriesLabels[ref] = labels
wtm.seriesSegmentIndexes[ref] = index
@ -62,6 +64,8 @@ func (wtm *writeToMock) StoreSeries(series []tsdb.RefSeries, index int) {
func (wtm *writeToMock) SeriesReset(index int) {
// Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint.
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
for k, v := range wtm.seriesSegmentIndexes {
if v < index {
delete(wtm.seriesLabels, k)
@ -70,6 +74,12 @@ func (wtm *writeToMock) SeriesReset(index int) {
}
}
func (wtm *writeToMock) checkNumLabels() int {
wtm.seriesLock.Lock()
defer wtm.seriesLock.Unlock()
return len(wtm.seriesLabels)
}
func newWriteToMock() *writeToMock {
return &writeToMock{
seriesLabels: make(map[uint64][]prompb.Label),
@ -77,12 +87,6 @@ func newWriteToMock() *writeToMock {
}
}
// we need a way to check the value of the wal watcher records read metrics, the samples and series records
// with these we could write some example segments and checkpoints and then write tests for readSegment/watch
// to see if we get back the write number of series records/samples records/etc., and that we read a whole checkpoint
// on startup and when a new one is created
//
// we could do the same thing for readToEnd, readCheckpoint, readSeriesRecords, etc.
func Test_readToEnd_noCheckpoint(t *testing.T) {
pageSize := 32 * 1024
const seriesCount = 10
@ -98,7 +102,6 @@ func Test_readToEnd_noCheckpoint(t *testing.T) {
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize)
testutil.Ok(t, err)
// var input [][]byte
var recs [][]byte
enc := tsdb.RecordEncoder{}
@ -131,15 +134,28 @@ func Test_readToEnd_noCheckpoint(t *testing.T) {
}
testutil.Ok(t, w.Log(recs...))
first, last, err := w.Segments()
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
st := timestamp.FromTime(time.Now())
watcher := NewWALWatcher(nil, "", wt, dir, st)
_, _, err = watcher.readToEnd(wdir, first, last)
testutil.Ok(t, err)
testutil.Equals(t, seriesCount, len(wt.seriesLabels))
go watcher.Start()
i := 0
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
if wt.checkNumLabels() >= seriesCount*10*2 {
break
}
i++
if i >= 10 {
break
}
}
watcher.Stop()
ticker.Stop()
testutil.Equals(t, seriesCount, wt.checkNumLabels())
}
func Test_readToEnd_withCheckpoint(t *testing.T) {
@ -161,7 +177,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) {
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize)
testutil.Ok(t, err)
// write to the initial segment then checkpoint
// Write to the initial segment then checkpoint.
for i := 0; i < seriesCount*10; i++ {
ref := i + 100
series := enc.Series([]tsdb.RefSeries{
@ -187,7 +203,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) {
tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0)
w.Truncate(32)
// write more records after checkpointing
// Write more records after checkpointing.
for i := 0; i < seriesCount*10; i++ {
series := enc.Series([]tsdb.RefSeries{
tsdb.RefSeries{
@ -209,15 +225,27 @@ func Test_readToEnd_withCheckpoint(t *testing.T) {
}
}
first, last, err := w.Segments()
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
st := timestamp.FromTime(time.Now())
watcher := NewWALWatcher(nil, "", wt, dir, st)
_, _, err = watcher.readToEnd(wdir, first, last)
testutil.Ok(t, err)
testutil.Equals(t, seriesCount*10*2, len(wt.seriesLabels))
go watcher.Start()
i := 0
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
if wt.checkNumLabels() >= seriesCount*10*2 {
break
}
i++
if i >= 20 {
break
}
}
watcher.Stop()
ticker.Stop()
testutil.Equals(t, seriesCount*10*2, wt.checkNumLabels())
}
func Test_readCheckpoint(t *testing.T) {
@ -239,7 +267,7 @@ func Test_readCheckpoint(t *testing.T) {
w, err := wal.NewSize(nil, nil, wdir, 128*pageSize)
testutil.Ok(t, err)
// write to the initial segment then checkpoint
// Write to the initial segment then checkpoint.
for i := 0; i < seriesCount*10; i++ {
ref := i + 100
series := enc.Series([]tsdb.RefSeries{
@ -265,15 +293,29 @@ func Test_readCheckpoint(t *testing.T) {
tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0)
w.Truncate(32)
first, last, err := w.Segments()
// Start read after checkpoint, no more data written.
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
st := timestamp.FromTime(time.Now())
watcher := NewWALWatcher(nil, "", wt, dir, st)
_, _, err = watcher.readToEnd(wdir, first, last)
testutil.Ok(t, err)
testutil.Equals(t, seriesCount*10, len(wt.seriesLabels))
go watcher.Start()
i := 0
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
if wt.checkNumLabels() >= seriesCount*10*2 {
break
}
i++
if i >= 8 {
break
}
}
watcher.Stop()
ticker.Stop()
testutil.Equals(t, seriesCount*10, wt.checkNumLabels())
}
func Test_checkpoint_seriesReset(t *testing.T) {
@ -291,10 +333,9 @@ func Test_checkpoint_seriesReset(t *testing.T) {
enc := tsdb.RecordEncoder{}
w, err := wal.NewSize(nil, nil, wdir, pageSize)
// w.
testutil.Ok(t, err)
// write to the initial segment then checkpoint
// Write to the initial segment, then checkpoint later.
for i := 0; i < seriesCount*10; i++ {
ref := i + 100
series := enc.Series([]tsdb.RefSeries{
@ -318,15 +359,29 @@ func Test_checkpoint_seriesReset(t *testing.T) {
}
}
first, last, err := w.Segments()
_, _, err = w.Segments()
testutil.Ok(t, err)
wt := newWriteToMock()
st := timestamp.FromTime(time.Now())
watcher := NewWALWatcher(nil, "", wt, dir, st)
_, _, err = watcher.readToEnd(wdir, first, last)
testutil.Ok(t, err)
testutil.Equals(t, seriesCount*10, len(wt.seriesLabels))
go watcher.Start()
i := 0
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
if wt.checkNumLabels() >= seriesCount*10*2 {
break
}
i++
if i >= 50 {
break
}
}
watcher.Stop()
ticker.Stop()
testutil.Equals(t, seriesCount*10, wt.checkNumLabels())
// If you modify the checkpoint and truncate segment #'s run the test to see how
// many series records you end up with and change the last Equals check accordingly
@ -347,8 +402,8 @@ func Test_decodeRecord(t *testing.T) {
defer os.RemoveAll(dir)
wt := newWriteToMock()
st := timestamp.FromTime(time.Now())
watcher := NewWALWatcher(nil, "", wt, dir, st)
// st := timestamp.FromTime(time.Now().Add(-10 * time.Second))
watcher := NewWALWatcher(nil, "", wt, dir, 0)
// decode a series record
enc := tsdb.RecordEncoder{}
@ -365,3 +420,28 @@ func Test_decodeRecord(t *testing.T) {
testutil.Equals(t, 2, wt.samplesAppended)
}
func Test_decodeRecord_afterStart(t *testing.T) {
dir, err := ioutil.TempDir("", "decodeRecord")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wt := newWriteToMock()
// st := timestamp.FromTime(time.Now().Add(-10 * time.Second))
watcher := NewWALWatcher(nil, "", wt, dir, 1)
// decode a series record
enc := tsdb.RecordEncoder{}
buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil)
watcher.decodeRecord(buf)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(wt.seriesLabels))
// decode a samples record
buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil)
watcher.decodeRecord(buf)
testutil.Ok(t, err)
testutil.Equals(t, 1, wt.samplesAppended)
}