mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-10 07:34:04 -08:00
Various fixes to locking & shutdown for WAL-based remote write.
- Remove datarace in the exported highest scrape timestamp. - Backoff on enqueue should be per-sample - reset the result for each sample. - Remove diffKeys, unused ctx and cancelfunc in WALWatcher, 'name' from writeTo interface, and pass it to constructor. - Reorder functions in WALWatcher depth-first according to call graph. - Fix vendor/modules.txt. - Split out the various timer periods into consts at the top of the file. - Move w.currentSegmentMetric.Set close to where we set the currentSegment. - Combine r.Next() and isClosed(w.quit) into a single loop. - Unnest some ifs in WALWatcher.watch, propagate erros in decodeRecord, add some new lines to make it easier to read. - Reorganise checkpoint handling to reduce nesting and make it easier to follow. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
This commit is contained in:
parent
6f69e31398
commit
b93bafeee1
|
@ -215,7 +215,6 @@ type QueueManager struct {
|
|||
shards *shards
|
||||
numShards int
|
||||
reshardChan chan int
|
||||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
|
@ -259,7 +258,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high
|
|||
t.highestSentTimestampMetric = queueHighestSentTimestamp.WithLabelValues(t.queueName)
|
||||
t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName)
|
||||
t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName)
|
||||
t.watcher = NewWALWatcher(logger, t, walDir, startTime)
|
||||
t.watcher = NewWALWatcher(logger, client.Name(), t, walDir, startTime)
|
||||
t.shards = t.newShards()
|
||||
|
||||
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))
|
||||
|
@ -310,11 +309,10 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool {
|
|||
}
|
||||
t.seriesMtx.Unlock()
|
||||
|
||||
backoff := t.cfg.MinBackoff
|
||||
outer:
|
||||
for _, sample := range tempSamples {
|
||||
// This will result in spin/busy waiting if the queues are being resharded
|
||||
// or shutting down. TODO backoff.
|
||||
// This will only loop if the queues are being resharded.
|
||||
backoff := t.cfg.MinBackoff
|
||||
for {
|
||||
select {
|
||||
case <-t.quit:
|
||||
|
@ -325,6 +323,7 @@ outer:
|
|||
if t.shards.enqueue(sample.ref, sample.ts) {
|
||||
continue outer
|
||||
}
|
||||
|
||||
t.enqueueRetriesMetric.Inc()
|
||||
time.Sleep(time.Duration(backoff))
|
||||
backoff = backoff * 2
|
||||
|
@ -359,18 +358,6 @@ func (t *QueueManager) Stop() {
|
|||
t.wg.Wait()
|
||||
}
|
||||
|
||||
func (t *QueueManager) Name() string {
|
||||
return t.queueName
|
||||
}
|
||||
|
||||
// Find out which series are dropped after relabelling and make sure we have a metric label for them.
|
||||
func (t *QueueManager) diffKeys(ref uint64, original, relabelled model.LabelSet) {
|
||||
numDropped := len(original) - len(relabelled)
|
||||
if numDropped == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
|
||||
func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) {
|
||||
temp := make(map[uint64][]prompb.Label, len(series))
|
||||
|
@ -381,8 +368,6 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) {
|
|||
}
|
||||
t.processExternalLabels(ls)
|
||||
rl := relabel.Process(ls, t.relabelConfigs...)
|
||||
|
||||
t.diffKeys(s.Ref, ls, rl)
|
||||
if len(rl) == 0 {
|
||||
t.droppedSeries[s.Ref] = struct{}{}
|
||||
continue
|
||||
|
|
|
@ -42,6 +42,7 @@ type Storage struct {
|
|||
queues []*QueueManager
|
||||
samplesIn *ewmaRate
|
||||
samplesInMetric prometheus.Counter
|
||||
highestTimestampMtx sync.Mutex
|
||||
highestTimestamp int64
|
||||
highestTimestampMetric prometheus.Gauge
|
||||
|
||||
|
|
|
@ -14,9 +14,7 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
|
@ -33,6 +31,12 @@ import (
|
|||
"github.com/prometheus/tsdb/wal"
|
||||
)
|
||||
|
||||
const (
|
||||
readPeriod = 10 * time.Millisecond
|
||||
checkpointPeriod = 5 * time.Second
|
||||
segmentCheckPeriod = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
var (
|
||||
watcherSamplesRecordsRead = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
|
@ -123,14 +127,15 @@ type writeTo interface {
|
|||
Append([]tsdb.RefSample) bool
|
||||
StoreSeries([]tsdb.RefSeries, int)
|
||||
SeriesReset(int)
|
||||
Name() string
|
||||
}
|
||||
|
||||
// WALWatcher watches the TSDB WAL for a given WriteTo.
|
||||
type WALWatcher struct {
|
||||
name string
|
||||
writer writeTo
|
||||
logger log.Logger
|
||||
walDir string
|
||||
|
||||
currentSegment int
|
||||
lastCheckpoint string
|
||||
startTime int64
|
||||
|
@ -144,298 +149,44 @@ type WALWatcher struct {
|
|||
samplesSentPreTailing prometheus.Counter
|
||||
currentSegmentMetric prometheus.Gauge
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewWALWatcher creates a new WAL watcher for a given WriteTo.
|
||||
func NewWALWatcher(logger log.Logger, writer writeTo, walDir string, startTime int64) *WALWatcher {
|
||||
func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string, startTime int64) *WALWatcher {
|
||||
if logger == nil {
|
||||
logger = log.NewNopLogger()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
w := &WALWatcher{
|
||||
logger: logger,
|
||||
writer: writer,
|
||||
walDir: path.Join(walDir, "wal"),
|
||||
startTime: startTime,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.writer.Name())
|
||||
w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(w.writer.Name())
|
||||
w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(w.writer.Name())
|
||||
w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(w.writer.Name())
|
||||
w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(w.writer.Name())
|
||||
w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.writer.Name())
|
||||
w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.writer.Name())
|
||||
w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.writer.Name())
|
||||
w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(name)
|
||||
w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(name)
|
||||
w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(name)
|
||||
w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(name)
|
||||
w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(name)
|
||||
w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(name)
|
||||
w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(name)
|
||||
w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(name)
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *WALWatcher) Start() {
|
||||
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.writer.Name())
|
||||
level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name)
|
||||
go w.runWatcher()
|
||||
}
|
||||
|
||||
func (w *WALWatcher) Stop() {
|
||||
level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.writer.Name())
|
||||
level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.name)
|
||||
close(w.quit)
|
||||
}
|
||||
|
||||
// TODO: fix the exit logic for this function
|
||||
// The stop param is used to stop at the end of the existing WAL on startup,
|
||||
// since scraped samples may be written to the latest segment before we finish reading it.
|
||||
func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) {
|
||||
var (
|
||||
dec tsdb.RecordDecoder
|
||||
series []tsdb.RefSeries
|
||||
samples []tsdb.RefSample
|
||||
ret bool
|
||||
)
|
||||
for !isClosed(w.quit) {
|
||||
for r.Next() {
|
||||
series = series[:0]
|
||||
rec := r.Record()
|
||||
// If the timestamp is > start then we should Append this sample and exit readSeriesRecords,
|
||||
// because this is the first sample written to the WAL after the WAL watcher was started.
|
||||
typ := dec.Type(rec)
|
||||
if typ == tsdb.RecordSamples {
|
||||
samples, err := dec.Samples(rec, samples[:0])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, s := range samples {
|
||||
if s.T > w.startTime {
|
||||
w.writer.Append(samples)
|
||||
ret = true
|
||||
w.samplesSentPreTailing.Inc()
|
||||
}
|
||||
}
|
||||
if ret {
|
||||
level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start")
|
||||
level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index)
|
||||
return
|
||||
}
|
||||
}
|
||||
if typ != tsdb.RecordSeries {
|
||||
continue
|
||||
}
|
||||
|
||||
series, err := dec.Series(rec, nil)
|
||||
if err != nil {
|
||||
level.Error(log.With(w.logger)).Log("err", err)
|
||||
break
|
||||
}
|
||||
|
||||
w.writer.StoreSeries(series, index)
|
||||
}
|
||||
// Since we only call readSeriesRecords on fully written WAL segments or checkpoints,
|
||||
// Error() will only return an error if something actually went wrong when reading
|
||||
// a record, either it was invalid or it was only partially written to the WAL.
|
||||
if err := r.Err(); err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
return
|
||||
}
|
||||
// Ensure we read all of the bytes in the segment or checkpoint.
|
||||
if r.TotalRead() >= stop {
|
||||
level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read all the series records from a Checkpoint directory.
|
||||
func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
|
||||
sr, err := wal.NewSegmentsReader(checkpointDir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open checkpoint")
|
||||
}
|
||||
defer sr.Close()
|
||||
|
||||
split := strings.Split(checkpointDir, ".")
|
||||
if len(split) != 2 {
|
||||
return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir)
|
||||
}
|
||||
|
||||
i, err := strconv.Atoi(split[1])
|
||||
if err != nil {
|
||||
i = w.currentSegment - 1
|
||||
}
|
||||
|
||||
size, err := getCheckpointSize(checkpointDir)
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir)
|
||||
}
|
||||
|
||||
w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
|
||||
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
|
||||
w.writer.SeriesReset(i)
|
||||
return nil
|
||||
}
|
||||
|
||||
// When starting the WAL watcher, there is potentially an existing WAL. In that case, we
|
||||
// should read to the end of the newest existing segment before reading new records that
|
||||
// are written to it, storing data from series records along the way.
|
||||
// Unfortunately this function is duplicates some of TSDB Head.Init().
|
||||
func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*wal.Segment, *wal.LiveReader, error) {
|
||||
// Backfill from the checkpoint first if it exists.
|
||||
defer level.Debug(w.logger).Log("msg", "done reading existing WAL")
|
||||
dir, startFrom, err := tsdb.LastCheckpoint(walDir)
|
||||
if err != nil && err != tsdb.ErrNotFound {
|
||||
return nil, nil, errors.Wrap(err, "find last checkpoint")
|
||||
}
|
||||
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir)
|
||||
if err == nil {
|
||||
w.lastCheckpoint = dir
|
||||
err = w.readCheckpoint(dir)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
startFrom++
|
||||
}
|
||||
|
||||
// Backfill segments from the last checkpoint onwards if at least 2 segments exist.
|
||||
if lastSegment > 0 {
|
||||
for i := firstSegment; i < lastSegment; i++ {
|
||||
seg, err := wal.OpenReadSegment(wal.SegmentName(walDir, i))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
sz, _ := getSegmentSize(walDir, i)
|
||||
w.readSeriesRecords(wal.NewLiveReader(seg), i, sz)
|
||||
}
|
||||
}
|
||||
|
||||
// We want to start the WAL Watcher from the end of the last segment on start,
|
||||
// so we make sure to return the wal.Segment pointer
|
||||
segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, lastSegment))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
r := wal.NewLiveReader(segment)
|
||||
sz, _ := getSegmentSize(walDir, lastSegment)
|
||||
w.readSeriesRecords(r, lastSegment, sz)
|
||||
return segment, r, nil
|
||||
}
|
||||
|
||||
func (w *WALWatcher) decodeRecord(rec []byte) error {
|
||||
var (
|
||||
dec tsdb.RecordDecoder
|
||||
series []tsdb.RefSeries
|
||||
samples []tsdb.RefSample
|
||||
)
|
||||
switch dec.Type(rec) {
|
||||
case tsdb.RecordSeries:
|
||||
series, err := dec.Series(rec, series[:0])
|
||||
if err != nil {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
level.Error(log.With(w.logger)).Log("err", err)
|
||||
break
|
||||
}
|
||||
w.seriesReadMetric.Add(float64(len(series)))
|
||||
w.writer.StoreSeries(series, w.currentSegment)
|
||||
case tsdb.RecordSamples:
|
||||
samples, err := dec.Samples(rec, samples[:0])
|
||||
if err != nil {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
level.Error(log.With(w.logger)).Log("err", err)
|
||||
break
|
||||
}
|
||||
w.samplesReadMetric.Add(float64(len(samples)))
|
||||
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
|
||||
w.writer.Append(samples)
|
||||
case tsdb.RecordTombstones:
|
||||
w.tombstonesReadMetric.Add(float64(len(samples)))
|
||||
case tsdb.RecordInvalid:
|
||||
w.invalidReadMetric.Add(float64(len(samples)))
|
||||
return errors.New("invalid record")
|
||||
default:
|
||||
level.Info(w.logger).Log("msg", "unknown TSDB record type in decodeSegment")
|
||||
return errors.New("unknown TSDB record type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WALWatcher) readSegment(r *wal.LiveReader) {
|
||||
for r.Next() && !isClosed(w.quit) {
|
||||
err := w.decodeRecord(r.Record())
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
}
|
||||
}
|
||||
if err := r.Err(); err != nil && err != io.EOF {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error {
|
||||
readTimeout := 10 * time.Millisecond
|
||||
readTicker := time.NewTicker(readTimeout)
|
||||
defer readTicker.Stop()
|
||||
checkpointTicker := time.NewTicker(5 * time.Second)
|
||||
defer checkpointTicker.Stop()
|
||||
segmentTicker := time.NewTicker(100 * time.Millisecond)
|
||||
defer segmentTicker.Stop()
|
||||
|
||||
currentSegmentName := fmt.Sprintf("%08d", w.currentSegment)
|
||||
w.currentSegmentMetric.Set(float64(w.currentSegment))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.quit:
|
||||
level.Info(w.logger).Log("msg", "quitting WAL watcher watch loop")
|
||||
return errors.New("quit channel")
|
||||
case <-checkpointTicker.C:
|
||||
// check if there is a new checkpoint
|
||||
dir, _, err := tsdb.LastCheckpoint(w.walDir)
|
||||
if err != nil && err != tsdb.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
cn, err := checkpointNum(dir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// TODO: callum, simplify the nesting here
|
||||
if err == nil && dir != w.lastCheckpoint {
|
||||
level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir)
|
||||
d, err := strconv.Atoi(cn)
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
} else if d < w.currentSegment {
|
||||
w.lastCheckpoint = dir
|
||||
// This potentially takes a long time, should we run it in another go routine?
|
||||
err = w.readCheckpoint(w.lastCheckpoint)
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
}
|
||||
} else {
|
||||
level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint for now", "current", currentSegmentName, "checkpoint", dir)
|
||||
}
|
||||
}
|
||||
case <-segmentTicker.C:
|
||||
// check if new segments exist
|
||||
_, last, err := wl.Segments()
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
continue
|
||||
}
|
||||
if last > w.currentSegment {
|
||||
w.readSegment(reader)
|
||||
level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", currentSegmentName, "new", fmt.Sprintf("%08d", last))
|
||||
return nil
|
||||
}
|
||||
case <-readTicker.C:
|
||||
w.readSegment(reader)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WALWatcher) runWatcher() {
|
||||
// The WAL dir may not exist when Prometheus first starts up.
|
||||
for {
|
||||
|
@ -472,14 +223,11 @@ func (w *WALWatcher) runWatcher() {
|
|||
}
|
||||
|
||||
w.currentSegment = last
|
||||
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
return
|
||||
}
|
||||
w.currentSegmentMetric.Set(float64(w.currentSegment))
|
||||
|
||||
for {
|
||||
level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment)
|
||||
|
||||
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
||||
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
||||
err := w.watch(nw, reader)
|
||||
|
@ -488,7 +236,10 @@ func (w *WALWatcher) runWatcher() {
|
|||
level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
w.currentSegment++
|
||||
w.currentSegmentMetric.Set(float64(w.currentSegment))
|
||||
|
||||
segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment))
|
||||
reader = wal.NewLiveReader(segment)
|
||||
// TODO: callum, is this error really fatal?
|
||||
|
@ -499,13 +250,292 @@ func (w *WALWatcher) runWatcher() {
|
|||
}
|
||||
}
|
||||
|
||||
func checkpointNum(dir string) (string, error) {
|
||||
// When starting the WAL watcher, there is potentially an existing WAL. In that case, we
|
||||
// should read to the end of the newest existing segment before reading new records that
|
||||
// are written to it, storing data from series records along the way.
|
||||
// Unfortunately this function is duplicates some of TSDB Head.Init().
|
||||
func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*wal.Segment, *wal.LiveReader, error) {
|
||||
// Backfill from the checkpoint first if it exists.
|
||||
defer level.Debug(w.logger).Log("msg", "done reading existing WAL")
|
||||
dir, startFrom, err := tsdb.LastCheckpoint(walDir)
|
||||
if err != nil && err != tsdb.ErrNotFound {
|
||||
return nil, nil, errors.Wrap(err, "find last checkpoint")
|
||||
}
|
||||
|
||||
level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir)
|
||||
if err == nil {
|
||||
w.lastCheckpoint = dir
|
||||
err = w.readCheckpoint(dir)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
startFrom++
|
||||
}
|
||||
|
||||
// Backfill segments from the last checkpoint onwards if at least 2 segments exist.
|
||||
if lastSegment > 0 {
|
||||
for i := firstSegment; i < lastSegment; i++ {
|
||||
seg, err := wal.OpenReadSegment(wal.SegmentName(walDir, i))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
sz, _ := getSegmentSize(walDir, i)
|
||||
w.readSeriesRecords(wal.NewLiveReader(seg), i, sz)
|
||||
}
|
||||
}
|
||||
|
||||
// We want to start the WAL Watcher from the end of the last segment on start,
|
||||
// so we make sure to return the wal.Segment pointer
|
||||
segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, lastSegment))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
r := wal.NewLiveReader(segment)
|
||||
sz, _ := getSegmentSize(walDir, lastSegment)
|
||||
w.readSeriesRecords(r, lastSegment, sz)
|
||||
return segment, r, nil
|
||||
}
|
||||
|
||||
// TODO: fix the exit logic for this function
|
||||
// The stop param is used to stop at the end of the existing WAL on startup,
|
||||
// since scraped samples may be written to the latest segment before we finish reading it.
|
||||
func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) {
|
||||
var (
|
||||
dec tsdb.RecordDecoder
|
||||
series []tsdb.RefSeries
|
||||
samples []tsdb.RefSample
|
||||
ret bool
|
||||
)
|
||||
|
||||
for r.Next() && !isClosed(w.quit) {
|
||||
series = series[:0]
|
||||
rec := r.Record()
|
||||
// If the timestamp is > start then we should Append this sample and exit readSeriesRecords,
|
||||
// because this is the first sample written to the WAL after the WAL watcher was started.
|
||||
typ := dec.Type(rec)
|
||||
if typ == tsdb.RecordSamples {
|
||||
samples, err := dec.Samples(rec, samples[:0])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, s := range samples {
|
||||
if s.T > w.startTime {
|
||||
w.writer.Append(samples)
|
||||
ret = true
|
||||
w.samplesSentPreTailing.Inc()
|
||||
}
|
||||
}
|
||||
if ret {
|
||||
level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start")
|
||||
level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index)
|
||||
return
|
||||
}
|
||||
}
|
||||
if typ != tsdb.RecordSeries {
|
||||
continue
|
||||
}
|
||||
|
||||
series, err := dec.Series(rec, nil)
|
||||
if err != nil {
|
||||
level.Error(log.With(w.logger)).Log("err", err)
|
||||
break
|
||||
}
|
||||
|
||||
w.writer.StoreSeries(series, index)
|
||||
}
|
||||
|
||||
// Since we only call readSeriesRecords on fully written WAL segments or checkpoints,
|
||||
// Error() will only return an error if something actually went wrong when reading
|
||||
// a record, either it was invalid or it was only partially written to the WAL.
|
||||
if err := r.Err(); err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure we read all of the bytes in the segment or checkpoint.
|
||||
if r.TotalRead() >= stop {
|
||||
level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error {
|
||||
|
||||
readTicker := time.NewTicker(readPeriod)
|
||||
defer readTicker.Stop()
|
||||
|
||||
checkpointTicker := time.NewTicker(checkpointPeriod)
|
||||
defer checkpointTicker.Stop()
|
||||
|
||||
segmentTicker := time.NewTicker(segmentCheckPeriod)
|
||||
defer segmentTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.quit:
|
||||
level.Info(w.logger).Log("msg", "quitting WAL watcher watch loop")
|
||||
return errors.New("quit channel")
|
||||
|
||||
case <-checkpointTicker.C:
|
||||
// Periodically check if there is a new checkpoint.
|
||||
// As this is considered an optimisation, we ignore errors during
|
||||
// checkpoint processing.
|
||||
|
||||
dir, _, err := tsdb.LastCheckpoint(w.walDir)
|
||||
if err != nil && err != tsdb.ErrNotFound {
|
||||
level.Error(w.logger).Log("msg", "error getting last checkpoint", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if dir == w.lastCheckpoint {
|
||||
continue
|
||||
}
|
||||
|
||||
level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir)
|
||||
|
||||
d, err := checkpointNum(dir)
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("msg", "error parsing checkpoint", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if d >= w.currentSegment {
|
||||
level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", w.currentSegment), "checkpoint", dir)
|
||||
continue
|
||||
}
|
||||
|
||||
w.lastCheckpoint = dir
|
||||
// This potentially takes a long time, should we run it in another go routine?
|
||||
err = w.readCheckpoint(w.lastCheckpoint)
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
}
|
||||
|
||||
case <-segmentTicker.C:
|
||||
// check if new segments exist
|
||||
_, last, err := wl.Segments()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "segments")
|
||||
}
|
||||
|
||||
if last <= w.currentSegment {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := w.readSegment(reader); err != nil {
|
||||
// Ignore errors reading to end of segment, as we're going to move to
|
||||
// next segment now.
|
||||
level.Error(w.logger).Log("msg", "error reading to end of segment", "err", err)
|
||||
}
|
||||
|
||||
level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", w.currentSegment), "new", fmt.Sprintf("%08d", last))
|
||||
return nil
|
||||
|
||||
case <-readTicker.C:
|
||||
if err := w.readSegment(reader); err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WALWatcher) readSegment(r *wal.LiveReader) error {
|
||||
for r.Next() && !isClosed(w.quit) {
|
||||
err := w.decodeRecord(r.Record())
|
||||
|
||||
// Intentionally skip over record decode errors.
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("err", err)
|
||||
}
|
||||
}
|
||||
return r.Err()
|
||||
}
|
||||
|
||||
func (w *WALWatcher) decodeRecord(rec []byte) error {
|
||||
var (
|
||||
dec tsdb.RecordDecoder
|
||||
series []tsdb.RefSeries
|
||||
samples []tsdb.RefSample
|
||||
)
|
||||
switch dec.Type(rec) {
|
||||
case tsdb.RecordSeries:
|
||||
series, err := dec.Series(rec, series[:0])
|
||||
if err != nil {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
return err
|
||||
}
|
||||
w.seriesReadMetric.Add(float64(len(series)))
|
||||
w.writer.StoreSeries(series, w.currentSegment)
|
||||
|
||||
case tsdb.RecordSamples:
|
||||
samples, err := dec.Samples(rec, samples[:0])
|
||||
if err != nil {
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
return err
|
||||
}
|
||||
w.samplesReadMetric.Add(float64(len(samples)))
|
||||
// Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
|
||||
w.writer.Append(samples)
|
||||
|
||||
case tsdb.RecordTombstones:
|
||||
w.tombstonesReadMetric.Add(float64(len(samples)))
|
||||
|
||||
case tsdb.RecordInvalid:
|
||||
w.invalidReadMetric.Add(float64(len(samples)))
|
||||
return errors.New("invalid record")
|
||||
|
||||
default:
|
||||
w.recordDecodeFailsMetric.Inc()
|
||||
return errors.New("unknown TSDB record type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read all the series records from a Checkpoint directory.
|
||||
func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
|
||||
sr, err := wal.NewSegmentsReader(checkpointDir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open checkpoint")
|
||||
}
|
||||
defer sr.Close()
|
||||
|
||||
split := strings.Split(checkpointDir, ".")
|
||||
if len(split) != 2 {
|
||||
return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir)
|
||||
}
|
||||
|
||||
i, err := strconv.Atoi(split[1])
|
||||
if err != nil {
|
||||
i = w.currentSegment - 1
|
||||
}
|
||||
|
||||
size, err := getCheckpointSize(checkpointDir)
|
||||
if err != nil {
|
||||
level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir)
|
||||
return errors.Wrap(err, "get checkpoint size")
|
||||
}
|
||||
|
||||
w.readSeriesRecords(wal.NewLiveReader(sr), i, size)
|
||||
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
|
||||
w.writer.SeriesReset(i)
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkpointNum(dir string) (int, error) {
|
||||
// Checkpoint dir names are in the format checkpoint.000001
|
||||
chunks := strings.Split(dir, ".")
|
||||
if len(chunks) != 2 {
|
||||
return "", errors.Errorf("invalid checkpoint dir string: %s", dir)
|
||||
return 0, errors.Errorf("invalid checkpoint dir string: %s", dir)
|
||||
}
|
||||
return chunks[1], nil
|
||||
|
||||
result, err := strconv.Atoi(chunks[1])
|
||||
if err != nil {
|
||||
return 0, errors.Errorf("invalid checkpoint dir string: %s", dir)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func getCheckpointSize(dir string) (int64, error) {
|
||||
|
|
|
@ -70,10 +70,6 @@ func (wtm *writeToMock) SeriesReset(index int) {
|
|||
}
|
||||
}
|
||||
|
||||
func (wtm *writeToMock) Name() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func newWriteToMock() *writeToMock {
|
||||
return &writeToMock{
|
||||
seriesLabels: make(map[uint64][]prompb.Label),
|
||||
|
@ -140,7 +136,7 @@ func Test_readToEnd_noCheckpoint(t *testing.T) {
|
|||
|
||||
wt := newWriteToMock()
|
||||
st := timestamp.FromTime(time.Now())
|
||||
watcher := NewWALWatcher(nil, wt, dir, st)
|
||||
watcher := NewWALWatcher(nil, "", wt, dir, st)
|
||||
_, _, err = watcher.readToEnd(wdir, first, last)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, seriesCount, len(wt.seriesLabels))
|
||||
|
@ -218,7 +214,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) {
|
|||
|
||||
wt := newWriteToMock()
|
||||
st := timestamp.FromTime(time.Now())
|
||||
watcher := NewWALWatcher(nil, wt, dir, st)
|
||||
watcher := NewWALWatcher(nil, "", wt, dir, st)
|
||||
_, _, err = watcher.readToEnd(wdir, first, last)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, seriesCount*10*2, len(wt.seriesLabels))
|
||||
|
@ -274,7 +270,7 @@ func Test_readCheckpoint(t *testing.T) {
|
|||
|
||||
wt := newWriteToMock()
|
||||
st := timestamp.FromTime(time.Now())
|
||||
watcher := NewWALWatcher(nil, wt, dir, st)
|
||||
watcher := NewWALWatcher(nil, "", wt, dir, st)
|
||||
_, _, err = watcher.readToEnd(wdir, first, last)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, seriesCount*10, len(wt.seriesLabels))
|
||||
|
@ -327,7 +323,7 @@ func Test_checkpoint_seriesReset(t *testing.T) {
|
|||
|
||||
wt := newWriteToMock()
|
||||
st := timestamp.FromTime(time.Now())
|
||||
watcher := NewWALWatcher(nil, wt, dir, st)
|
||||
watcher := NewWALWatcher(nil, "", wt, dir, st)
|
||||
_, _, err = watcher.readToEnd(wdir, first, last)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, seriesCount*10, len(wt.seriesLabels))
|
||||
|
@ -352,7 +348,7 @@ func Test_decodeRecord(t *testing.T) {
|
|||
|
||||
wt := newWriteToMock()
|
||||
st := timestamp.FromTime(time.Now())
|
||||
watcher := NewWALWatcher(nil, wt, dir, st)
|
||||
watcher := NewWALWatcher(nil, "", wt, dir, st)
|
||||
|
||||
// decode a series record
|
||||
enc := tsdb.RecordEncoder{}
|
||||
|
|
|
@ -20,32 +20,47 @@ import (
|
|||
|
||||
// Appender implements scrape.Appendable.
|
||||
func (s *Storage) Appender() (storage.Appender, error) {
|
||||
return s, nil
|
||||
return ×tampTracker{
|
||||
storage: s,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type timestampTracker struct {
|
||||
storage *Storage
|
||||
samples int64
|
||||
highestTimestamp int64
|
||||
}
|
||||
|
||||
// Add implements storage.Appender.
|
||||
func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
s.samplesIn.incr(1)
|
||||
s.samplesInMetric.Inc()
|
||||
if t > s.highestTimestamp {
|
||||
s.highestTimestamp = t
|
||||
func (t *timestampTracker) Add(_ labels.Labels, ts int64, v float64) (uint64, error) {
|
||||
t.samples++
|
||||
if ts > t.highestTimestamp {
|
||||
t.highestTimestamp = ts
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// AddFast implements storage.Appender.
|
||||
func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error {
|
||||
_, err := s.Add(l, t, v)
|
||||
func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float64) error {
|
||||
_, err := t.Add(l, ts, v)
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit implements storage.Appender.
|
||||
func (s *Storage) Commit() error {
|
||||
s.highestTimestampMetric.Set(float64(s.highestTimestamp))
|
||||
func (t *timestampTracker) Commit() error {
|
||||
t.storage.samplesIn.incr(t.samples)
|
||||
t.storage.samplesInMetric.Add(float64(t.samples))
|
||||
|
||||
t.storage.highestTimestampMtx.Lock()
|
||||
defer t.storage.highestTimestampMtx.Unlock()
|
||||
if t.highestTimestamp > t.storage.highestTimestamp {
|
||||
t.storage.highestTimestamp = t.highestTimestamp
|
||||
t.storage.highestTimestampMetric.Set(float64(t.highestTimestamp))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rollback implements storage.Appender.
|
||||
func (*Storage) Rollback() error {
|
||||
func (*timestampTracker) Rollback() error {
|
||||
return nil
|
||||
}
|
||||
|
|
4
vendor/modules.txt
vendored
4
vendor/modules.txt
vendored
|
@ -243,12 +243,12 @@ github.com/prometheus/procfs/xfs
|
|||
github.com/prometheus/procfs/internal/util
|
||||
# github.com/prometheus/tsdb v0.4.0
|
||||
github.com/prometheus/tsdb
|
||||
github.com/prometheus/tsdb/fileutil
|
||||
github.com/prometheus/tsdb/wal
|
||||
github.com/prometheus/tsdb/labels
|
||||
github.com/prometheus/tsdb/chunkenc
|
||||
github.com/prometheus/tsdb/chunks
|
||||
github.com/prometheus/tsdb/fileutil
|
||||
github.com/prometheus/tsdb/index
|
||||
github.com/prometheus/tsdb/wal
|
||||
# github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13
|
||||
github.com/samuel/go-zookeeper/zk
|
||||
# github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d
|
||||
|
|
Loading…
Reference in a new issue