mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
* fix bug that would cause us to only read from the WAL on the 15s fallback timer if remote write had fallen behind and is no longer reading from the WAL segment that is currently being written to Signed-off-by: Callum Styan <callumstyan@gmail.com> * remove unintended logging, fix lint, plus allow test to take slightly longer because cloud CI Signed-off-by: Callum Styan <callumstyan@gmail.com> * address review feedback Signed-off-by: Callum Styan <callumstyan@gmail.com> * fix watcher sleeps in test, flu brain is smooth Signed-off-by: Callum Styan <callumstyan@gmail.com> * increase timeout, unfortunately cloud CI can require a longer timeout Signed-off-by: Callum Styan <callumstyan@gmail.com> --------- Signed-off-by: Callum Styan <callumstyan@gmail.com>
789 lines
23 KiB
Go
789 lines
23 KiB
Go
// Copyright 2018 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package wlog
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"golang.org/x/exp/slices"
|
|
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
)
|
|
|
|
const (
|
|
checkpointPeriod = 5 * time.Second
|
|
segmentCheckPeriod = 100 * time.Millisecond
|
|
consumer = "consumer"
|
|
)
|
|
|
|
var (
|
|
ErrIgnorable = errors.New("ignore me")
|
|
readTimeout = 15 * time.Second
|
|
)
|
|
|
|
// WriteTo is an interface used by the Watcher to send the samples it's read
|
|
// from the WAL on to somewhere else. Functions will be called concurrently
|
|
// and it is left to the implementer to make sure they are safe.
|
|
type WriteTo interface {
|
|
// Append and AppendExemplar should block until the samples are fully accepted,
|
|
// whether enqueued in memory or successfully written to it's final destination.
|
|
// Once returned, the WAL Watcher will not attempt to pass that data again.
|
|
Append([]record.RefSample) bool
|
|
AppendExemplars([]record.RefExemplar) bool
|
|
AppendHistograms([]record.RefHistogramSample) bool
|
|
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
|
|
StoreSeries([]record.RefSeries, int)
|
|
|
|
// Next two methods are intended for garbage-collection: first we call
|
|
// UpdateSeriesSegment on all current series
|
|
UpdateSeriesSegment([]record.RefSeries, int)
|
|
// Then SeriesReset is called to allow the deletion
|
|
// of all series created in a segment lower than the argument.
|
|
SeriesReset(int)
|
|
}
|
|
|
|
// Used to notify the watcher that data has been written so that it can read.
|
|
type WriteNotified interface {
|
|
Notify()
|
|
}
|
|
|
|
type WatcherMetrics struct {
|
|
recordsRead *prometheus.CounterVec
|
|
recordDecodeFails *prometheus.CounterVec
|
|
samplesSentPreTailing *prometheus.CounterVec
|
|
currentSegment *prometheus.GaugeVec
|
|
notificationsSkipped *prometheus.CounterVec
|
|
}
|
|
|
|
// Watcher watches the TSDB WAL for a given WriteTo.
|
|
type Watcher struct {
|
|
name string
|
|
writer WriteTo
|
|
logger log.Logger
|
|
walDir string
|
|
lastCheckpoint string
|
|
sendExemplars bool
|
|
sendHistograms bool
|
|
metrics *WatcherMetrics
|
|
readerMetrics *LiveReaderMetrics
|
|
|
|
startTime time.Time
|
|
startTimestamp int64 // the start time as a Prometheus timestamp
|
|
sendSamples bool
|
|
|
|
recordsReadMetric *prometheus.CounterVec
|
|
recordDecodeFailsMetric prometheus.Counter
|
|
samplesSentPreTailing prometheus.Counter
|
|
currentSegmentMetric prometheus.Gauge
|
|
notificationsSkipped prometheus.Counter
|
|
|
|
readNotify chan struct{}
|
|
quit chan struct{}
|
|
done chan struct{}
|
|
|
|
// For testing, stop when we hit this segment.
|
|
MaxSegment int
|
|
}
|
|
|
|
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
|
|
m := &WatcherMetrics{
|
|
recordsRead: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "records_read_total",
|
|
Help: "Number of records read by the WAL watcher from the WAL.",
|
|
},
|
|
[]string{consumer, "type"},
|
|
),
|
|
recordDecodeFails: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "record_decode_failures_total",
|
|
Help: "Number of records read by the WAL watcher that resulted in an error when decoding.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
samplesSentPreTailing: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "samples_sent_pre_tailing_total",
|
|
Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
currentSegment: prometheus.NewGaugeVec(
|
|
prometheus.GaugeOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "current_segment",
|
|
Help: "Current segment the WAL watcher is reading records from.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
notificationsSkipped: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "notifications_skipped_total",
|
|
Help: "The number of WAL write notifications that the Watcher has skipped due to already being in a WAL read routine.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
}
|
|
|
|
if reg != nil {
|
|
reg.MustRegister(m.recordsRead)
|
|
reg.MustRegister(m.recordDecodeFails)
|
|
reg.MustRegister(m.samplesSentPreTailing)
|
|
reg.MustRegister(m.currentSegment)
|
|
reg.MustRegister(m.notificationsSkipped)
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// NewWatcher creates a new WAL watcher for a given WriteTo.
|
|
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher {
|
|
if logger == nil {
|
|
logger = log.NewNopLogger()
|
|
}
|
|
return &Watcher{
|
|
logger: logger,
|
|
writer: writer,
|
|
metrics: metrics,
|
|
readerMetrics: readerMetrics,
|
|
walDir: filepath.Join(dir, "wal"),
|
|
name: name,
|
|
sendExemplars: sendExemplars,
|
|
sendHistograms: sendHistograms,
|
|
|
|
readNotify: make(chan struct{}),
|
|
quit: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
|
MaxSegment: -1,
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) Notify() {
|
|
select {
|
|
case w.readNotify <- struct{}{}:
|
|
return
|
|
default: // default so we can exit
|
|
// we don't need a buffered channel or any buffering since
|
|
// for each notification it recv's the watcher will read until EOF
|
|
w.notificationsSkipped.Inc()
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) setMetrics() {
|
|
// Setup the WAL Watchers metrics. We do this here rather than in the
|
|
// constructor because of the ordering of creating Queue Managers's,
|
|
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
|
|
if w.metrics != nil {
|
|
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
|
|
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
|
|
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
|
|
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
|
|
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
|
|
|
|
}
|
|
}
|
|
|
|
// Start the Watcher.
|
|
func (w *Watcher) Start() {
|
|
w.setMetrics()
|
|
level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
|
|
|
|
go w.loop()
|
|
}
|
|
|
|
// Stop the Watcher.
|
|
func (w *Watcher) Stop() {
|
|
close(w.quit)
|
|
<-w.done
|
|
|
|
// Records read metric has series and samples.
|
|
if w.metrics != nil {
|
|
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
|
|
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
|
|
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
|
|
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
|
|
w.metrics.currentSegment.DeleteLabelValues(w.name)
|
|
}
|
|
|
|
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
|
|
}
|
|
|
|
func (w *Watcher) loop() {
|
|
defer close(w.done)
|
|
|
|
// We may encounter failures processing the WAL; we should wait and retry.
|
|
for !isClosed(w.quit) {
|
|
w.SetStartTime(time.Now())
|
|
if err := w.Run(); err != nil {
|
|
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
|
|
}
|
|
|
|
select {
|
|
case <-w.quit:
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run the watcher, which will tail the WAL until the quit channel is closed
|
|
// or an error case is hit.
|
|
func (w *Watcher) Run() error {
|
|
var lastSegment int
|
|
var err error
|
|
|
|
// We want to ensure this is false across iterations since
|
|
// Run will be called again if there was a failure to read the WAL.
|
|
w.sendSamples = false
|
|
|
|
level.Info(w.logger).Log("msg", "Replaying WAL", "queue", w.name)
|
|
|
|
// Backfill from the checkpoint first if it exists.
|
|
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) {
|
|
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
|
|
}
|
|
|
|
if err == nil {
|
|
if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil {
|
|
return fmt.Errorf("readCheckpoint: %w", err)
|
|
}
|
|
}
|
|
w.lastCheckpoint = lastCheckpoint
|
|
|
|
currentSegment, err := w.findSegmentForIndex(checkpointIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
|
|
for !isClosed(w.quit) {
|
|
w.currentSegmentMetric.Set(float64(currentSegment))
|
|
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
|
|
|
|
// Reset the value of lastSegment each iteration, this is to avoid having to wait too long for
|
|
// between reads if we're reading a segment that is not the most recent segment after startup.
|
|
_, lastSegment, err = w.firstAndLast()
|
|
if err != nil {
|
|
return fmt.Errorf("wal.Segments: %w", err)
|
|
}
|
|
tail := currentSegment >= lastSegment
|
|
|
|
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
|
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
|
if err := w.watch(currentSegment, tail); err != nil && !errors.Is(err, ErrIgnorable) {
|
|
return err
|
|
}
|
|
|
|
// For testing: stop when you hit a specific segment.
|
|
if currentSegment == w.MaxSegment {
|
|
return nil
|
|
}
|
|
|
|
currentSegment++
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// findSegmentForIndex finds the first segment greater than or equal to index.
|
|
func (w *Watcher) findSegmentForIndex(index int) (int, error) {
|
|
refs, err := w.segments(w.walDir)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
for _, r := range refs {
|
|
if r >= index {
|
|
return r, nil
|
|
}
|
|
}
|
|
|
|
return -1, errors.New("failed to find segment for index")
|
|
}
|
|
|
|
func (w *Watcher) firstAndLast() (int, int, error) {
|
|
refs, err := w.segments(w.walDir)
|
|
if err != nil {
|
|
return -1, -1, err
|
|
}
|
|
|
|
if len(refs) == 0 {
|
|
return -1, -1, nil
|
|
}
|
|
return refs[0], refs[len(refs)-1], nil
|
|
}
|
|
|
|
// Copied from tsdb/wlog/wlog.go so we do not have to open a WAL.
|
|
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
|
|
func (w *Watcher) segments(dir string) ([]int, error) {
|
|
files, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var refs []int
|
|
for _, f := range files {
|
|
k, err := strconv.Atoi(f.Name())
|
|
if err != nil {
|
|
continue
|
|
}
|
|
refs = append(refs, k)
|
|
}
|
|
slices.Sort(refs)
|
|
for i := 0; i < len(refs)-1; i++ {
|
|
if refs[i]+1 != refs[i+1] {
|
|
return nil, errors.New("segments are not sequential")
|
|
}
|
|
}
|
|
return refs, nil
|
|
}
|
|
|
|
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
|
|
err := w.readSegment(r, segmentNum, tail)
|
|
|
|
// Ignore all errors reading to end of segment whilst replaying the WAL.
|
|
if !tail {
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
|
|
} else if r.Offset() != size {
|
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
|
|
}
|
|
return ErrIgnorable
|
|
}
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Use tail true to indicate that the reader is currently on a segment that is
|
|
// actively being written to. If false, assume it's a full segment and we're
|
|
// replaying it on start to cache the series records.
|
|
func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer segment.Close()
|
|
|
|
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
|
|
|
|
size := int64(math.MaxInt64)
|
|
if !tail {
|
|
var err error
|
|
size, err = getSegmentSize(w.walDir, segmentNum)
|
|
if err != nil {
|
|
return fmt.Errorf("getSegmentSize: %w", err)
|
|
}
|
|
|
|
return w.readAndHandleError(reader, segmentNum, tail, size)
|
|
}
|
|
|
|
checkpointTicker := time.NewTicker(checkpointPeriod)
|
|
defer checkpointTicker.Stop()
|
|
|
|
segmentTicker := time.NewTicker(segmentCheckPeriod)
|
|
defer segmentTicker.Stop()
|
|
|
|
readTicker := time.NewTicker(readTimeout)
|
|
defer readTicker.Stop()
|
|
|
|
gcSem := make(chan struct{}, 1)
|
|
for {
|
|
select {
|
|
case <-w.quit:
|
|
return nil
|
|
|
|
case <-checkpointTicker.C:
|
|
// Periodically check if there is a new checkpoint so we can garbage
|
|
// collect labels. As this is considered an optimisation, we ignore
|
|
// errors during checkpoint processing. Doing the process asynchronously
|
|
// allows the current WAL segment to be processed while reading the
|
|
// checkpoint.
|
|
select {
|
|
case gcSem <- struct{}{}:
|
|
go func() {
|
|
defer func() {
|
|
<-gcSem
|
|
}()
|
|
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
|
level.Warn(w.logger).Log("msg", "Error process checkpoint", "err", err)
|
|
}
|
|
}()
|
|
default:
|
|
// Currently doing a garbage collect, try again later.
|
|
}
|
|
|
|
case <-segmentTicker.C:
|
|
_, last, err := w.firstAndLast()
|
|
if err != nil {
|
|
return fmt.Errorf("segments: %w", err)
|
|
}
|
|
|
|
// Check if new segments exists.
|
|
if last <= segmentNum {
|
|
continue
|
|
}
|
|
err = w.readSegment(reader, segmentNum, tail)
|
|
|
|
// Ignore errors reading to end of segment whilst replaying the WAL.
|
|
if !tail {
|
|
switch {
|
|
case err != nil && !errors.Is(err, io.EOF):
|
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "err", err)
|
|
case reader.Offset() != size:
|
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
|
|
// we haven't read due to a notification in quite some time, try reading anyways
|
|
case <-readTicker.C:
|
|
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
|
|
err := w.readAndHandleError(reader, segmentNum, tail, size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// still want to reset the ticker so we don't read too often
|
|
readTicker.Reset(readTimeout)
|
|
|
|
case <-w.readNotify:
|
|
err := w.readAndHandleError(reader, segmentNum, tail, size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// still want to reset the ticker so we don't read too often
|
|
readTicker.Reset(readTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
|
dir, _, err := LastCheckpoint(w.walDir)
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) {
|
|
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
|
|
}
|
|
|
|
if dir == "" || dir == w.lastCheckpoint {
|
|
return nil
|
|
}
|
|
w.lastCheckpoint = dir
|
|
|
|
index, err := checkpointNum(dir)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing checkpoint filename: %w", err)
|
|
}
|
|
|
|
if index >= segmentNum {
|
|
level.Debug(w.logger).Log("msg", "Current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
|
|
return nil
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum)
|
|
|
|
if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil {
|
|
return fmt.Errorf("readCheckpoint: %w", err)
|
|
}
|
|
|
|
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
|
|
w.writer.SeriesReset(index)
|
|
return nil
|
|
}
|
|
|
|
// Read from a segment and pass the details to w.writer.
|
|
// Also used with readCheckpoint - implements segmentReadFn.
|
|
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|
var (
|
|
dec record.Decoder
|
|
series []record.RefSeries
|
|
samples []record.RefSample
|
|
samplesToSend []record.RefSample
|
|
exemplars []record.RefExemplar
|
|
histograms []record.RefHistogramSample
|
|
histogramsToSend []record.RefHistogramSample
|
|
floatHistograms []record.RefFloatHistogramSample
|
|
floatHistogramsToSend []record.RefFloatHistogramSample
|
|
)
|
|
for r.Next() && !isClosed(w.quit) {
|
|
rec := r.Record()
|
|
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
|
|
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series, err := dec.Series(rec, series[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.StoreSeries(series, segmentNum)
|
|
|
|
case record.Samples:
|
|
// If we're not tailing a segment we can ignore any samples records we see.
|
|
// This speeds up replay of the WAL by > 10x.
|
|
if !tail {
|
|
break
|
|
}
|
|
samples, err := dec.Samples(rec, samples[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
for _, s := range samples {
|
|
if s.T > w.startTimestamp {
|
|
if !w.sendSamples {
|
|
w.sendSamples = true
|
|
duration := time.Since(w.startTime)
|
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
|
}
|
|
samplesToSend = append(samplesToSend, s)
|
|
}
|
|
}
|
|
if len(samplesToSend) > 0 {
|
|
w.writer.Append(samplesToSend)
|
|
samplesToSend = samplesToSend[:0]
|
|
}
|
|
|
|
case record.Exemplars:
|
|
// Skip if experimental "exemplars over remote write" is not enabled.
|
|
if !w.sendExemplars {
|
|
break
|
|
}
|
|
// If we're not tailing a segment we can ignore any exemplars records we see.
|
|
// This speeds up replay of the WAL significantly.
|
|
if !tail {
|
|
break
|
|
}
|
|
exemplars, err := dec.Exemplars(rec, exemplars[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.AppendExemplars(exemplars)
|
|
|
|
case record.HistogramSamples:
|
|
// Skip if experimental "histograms over remote write" is not enabled.
|
|
if !w.sendHistograms {
|
|
break
|
|
}
|
|
if !tail {
|
|
break
|
|
}
|
|
histograms, err := dec.HistogramSamples(rec, histograms[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
for _, h := range histograms {
|
|
if h.T > w.startTimestamp {
|
|
if !w.sendSamples {
|
|
w.sendSamples = true
|
|
duration := time.Since(w.startTime)
|
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
|
}
|
|
histogramsToSend = append(histogramsToSend, h)
|
|
}
|
|
}
|
|
if len(histogramsToSend) > 0 {
|
|
w.writer.AppendHistograms(histogramsToSend)
|
|
histogramsToSend = histogramsToSend[:0]
|
|
}
|
|
case record.FloatHistogramSamples:
|
|
// Skip if experimental "histograms over remote write" is not enabled.
|
|
if !w.sendHistograms {
|
|
break
|
|
}
|
|
if !tail {
|
|
break
|
|
}
|
|
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
for _, fh := range floatHistograms {
|
|
if fh.T > w.startTimestamp {
|
|
if !w.sendSamples {
|
|
w.sendSamples = true
|
|
duration := time.Since(w.startTime)
|
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
|
}
|
|
floatHistogramsToSend = append(floatHistogramsToSend, fh)
|
|
}
|
|
}
|
|
if len(floatHistogramsToSend) > 0 {
|
|
w.writer.AppendFloatHistograms(floatHistogramsToSend)
|
|
floatHistogramsToSend = floatHistogramsToSend[:0]
|
|
}
|
|
case record.Tombstones:
|
|
|
|
default:
|
|
// Could be corruption, or reading from a WAL from a newer Prometheus.
|
|
w.recordDecodeFailsMetric.Inc()
|
|
}
|
|
}
|
|
if err := r.Err(); err != nil {
|
|
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Go through all series in a segment updating the segmentNum, so we can delete older series.
|
|
// Used with readCheckpoint - implements segmentReadFn.
|
|
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
|
|
var (
|
|
dec record.Decoder
|
|
series []record.RefSeries
|
|
)
|
|
for r.Next() && !isClosed(w.quit) {
|
|
rec := r.Record()
|
|
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
|
|
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series, err := dec.Series(rec, series[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.UpdateSeriesSegment(series, segmentNum)
|
|
|
|
// Ignore these; we're only interested in series.
|
|
case record.Samples:
|
|
case record.Exemplars:
|
|
case record.Tombstones:
|
|
|
|
default:
|
|
// Could be corruption, or reading from a WAL from a newer Prometheus.
|
|
w.recordDecodeFailsMetric.Inc()
|
|
}
|
|
}
|
|
if err := r.Err(); err != nil {
|
|
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Watcher) SetStartTime(t time.Time) {
|
|
w.startTime = t
|
|
w.startTimestamp = timestamp.FromTime(t)
|
|
}
|
|
|
|
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
|
|
|
|
// Read all the series records from a Checkpoint directory.
|
|
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
|
|
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
|
|
index, err := checkpointNum(checkpointDir)
|
|
if err != nil {
|
|
return fmt.Errorf("checkpointNum: %w", err)
|
|
}
|
|
|
|
// Ensure we read the whole contents of every segment in the checkpoint dir.
|
|
segs, err := w.segments(checkpointDir)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to get segments checkpoint dir: %w", err)
|
|
}
|
|
for _, seg := range segs {
|
|
size, err := getSegmentSize(checkpointDir, seg)
|
|
if err != nil {
|
|
return fmt.Errorf("getSegmentSize: %w", err)
|
|
}
|
|
|
|
sr, err := OpenReadSegment(SegmentName(checkpointDir, seg))
|
|
if err != nil {
|
|
return fmt.Errorf("unable to open segment: %w", err)
|
|
}
|
|
defer sr.Close()
|
|
|
|
r := NewLiveReader(w.logger, w.readerMetrics, sr)
|
|
if err := readFn(w, r, index, false); err != nil && !errors.Is(err, io.EOF) {
|
|
return fmt.Errorf("readSegment: %w", err)
|
|
}
|
|
|
|
if r.Offset() != size {
|
|
return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset())
|
|
}
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "Read series references from checkpoint", "checkpoint", checkpointDir)
|
|
return nil
|
|
}
|
|
|
|
func checkpointNum(dir string) (int, error) {
|
|
// Checkpoint dir names are in the format checkpoint.000001
|
|
// dir may contain a hidden directory, so only check the base directory
|
|
chunks := strings.Split(filepath.Base(dir), ".")
|
|
if len(chunks) != 2 {
|
|
return 0, fmt.Errorf("invalid checkpoint dir string: %s", dir)
|
|
}
|
|
|
|
result, err := strconv.Atoi(chunks[1])
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid checkpoint dir string: %s", dir)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Get size of segment.
|
|
func getSegmentSize(dir string, index int) (int64, error) {
|
|
i := int64(-1)
|
|
fi, err := os.Stat(SegmentName(dir, index))
|
|
if err == nil {
|
|
i = fi.Size()
|
|
}
|
|
return i, err
|
|
}
|
|
|
|
func isClosed(c chan struct{}) bool {
|
|
select {
|
|
case <-c:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|