mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-10 05:17:28 -08:00
9e43ad2e37
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Signed-off-by: machine424 <ayoubmrini424@gmail.com>
742 lines
22 KiB
Go
742 lines
22 KiB
Go
// Copyright 2018 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package wlog
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
)
|
|
|
|
const (
|
|
checkpointPeriod = 5 * time.Second
|
|
segmentCheckPeriod = 100 * time.Millisecond
|
|
consumer = "consumer"
|
|
)
|
|
|
|
var (
|
|
ErrIgnorable = errors.New("ignore me")
|
|
readTimeout = 15 * time.Second
|
|
)
|
|
|
|
// WriteTo is an interface used by the Watcher to send the samples it's read
|
|
// from the WAL on to somewhere else. Functions will be called concurrently
|
|
// and it is left to the implementer to make sure they are safe.
|
|
type WriteTo interface {
|
|
// Append and AppendExemplar should block until the samples are fully accepted,
|
|
// whether enqueued in memory or successfully written to it's final destination.
|
|
// Once returned, the WAL Watcher will not attempt to pass that data again.
|
|
Append([]record.RefSample) bool
|
|
AppendExemplars([]record.RefExemplar) bool
|
|
AppendHistograms([]record.RefHistogramSample) bool
|
|
AppendFloatHistograms([]record.RefFloatHistogramSample) bool
|
|
StoreSeries([]record.RefSeries, int)
|
|
StoreMetadata([]record.RefMetadata)
|
|
|
|
// Next two methods are intended for garbage-collection: first we call
|
|
// UpdateSeriesSegment on all current series
|
|
UpdateSeriesSegment([]record.RefSeries, int)
|
|
// Then SeriesReset is called to allow the deletion
|
|
// of all series created in a segment lower than the argument.
|
|
SeriesReset(int)
|
|
}
|
|
|
|
// Used to notify the watcher that data has been written so that it can read.
|
|
type WriteNotified interface {
|
|
Notify()
|
|
}
|
|
|
|
type WatcherMetrics struct {
|
|
recordsRead *prometheus.CounterVec
|
|
recordDecodeFails *prometheus.CounterVec
|
|
samplesSentPreTailing *prometheus.CounterVec
|
|
currentSegment *prometheus.GaugeVec
|
|
notificationsSkipped *prometheus.CounterVec
|
|
}
|
|
|
|
// Watcher watches the TSDB WAL for a given WriteTo.
|
|
type Watcher struct {
|
|
name string
|
|
writer WriteTo
|
|
logger log.Logger
|
|
walDir string
|
|
lastCheckpoint string
|
|
sendExemplars bool
|
|
sendHistograms bool
|
|
sendMetadata bool
|
|
metrics *WatcherMetrics
|
|
readerMetrics *LiveReaderMetrics
|
|
|
|
startTime time.Time
|
|
startTimestamp int64 // the start time as a Prometheus timestamp
|
|
sendSamples bool
|
|
|
|
recordsReadMetric *prometheus.CounterVec
|
|
recordDecodeFailsMetric prometheus.Counter
|
|
samplesSentPreTailing prometheus.Counter
|
|
currentSegmentMetric prometheus.Gauge
|
|
notificationsSkipped prometheus.Counter
|
|
|
|
readNotify chan struct{}
|
|
quit chan struct{}
|
|
done chan struct{}
|
|
|
|
// For testing, stop when we hit this segment.
|
|
MaxSegment int
|
|
}
|
|
|
|
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics {
|
|
m := &WatcherMetrics{
|
|
recordsRead: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "records_read_total",
|
|
Help: "Number of records read by the WAL watcher from the WAL.",
|
|
},
|
|
[]string{consumer, "type"},
|
|
),
|
|
recordDecodeFails: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "record_decode_failures_total",
|
|
Help: "Number of records read by the WAL watcher that resulted in an error when decoding.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
samplesSentPreTailing: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "samples_sent_pre_tailing_total",
|
|
Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
currentSegment: prometheus.NewGaugeVec(
|
|
prometheus.GaugeOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "current_segment",
|
|
Help: "Current segment the WAL watcher is reading records from.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
notificationsSkipped: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "wal_watcher",
|
|
Name: "notifications_skipped_total",
|
|
Help: "The number of WAL write notifications that the Watcher has skipped due to already being in a WAL read routine.",
|
|
},
|
|
[]string{consumer},
|
|
),
|
|
}
|
|
|
|
if reg != nil {
|
|
reg.MustRegister(m.recordsRead)
|
|
reg.MustRegister(m.recordDecodeFails)
|
|
reg.MustRegister(m.samplesSentPreTailing)
|
|
reg.MustRegister(m.currentSegment)
|
|
reg.MustRegister(m.notificationsSkipped)
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
// NewWatcher creates a new WAL watcher for a given WriteTo.
|
|
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher {
|
|
if logger == nil {
|
|
logger = log.NewNopLogger()
|
|
}
|
|
return &Watcher{
|
|
logger: logger,
|
|
writer: writer,
|
|
metrics: metrics,
|
|
readerMetrics: readerMetrics,
|
|
walDir: filepath.Join(dir, "wal"),
|
|
name: name,
|
|
sendExemplars: sendExemplars,
|
|
sendHistograms: sendHistograms,
|
|
sendMetadata: sendMetadata,
|
|
|
|
readNotify: make(chan struct{}),
|
|
quit: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
|
MaxSegment: -1,
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) Notify() {
|
|
select {
|
|
case w.readNotify <- struct{}{}:
|
|
return
|
|
default: // default so we can exit
|
|
// we don't need a buffered channel or any buffering since
|
|
// for each notification it recv's the watcher will read until EOF
|
|
w.notificationsSkipped.Inc()
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) setMetrics() {
|
|
// Setup the WAL Watchers metrics. We do this here rather than in the
|
|
// constructor because of the ordering of creating Queue Managers's,
|
|
// stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig.
|
|
if w.metrics != nil {
|
|
w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name})
|
|
w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name)
|
|
w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name)
|
|
w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name)
|
|
w.notificationsSkipped = w.metrics.notificationsSkipped.WithLabelValues(w.name)
|
|
}
|
|
}
|
|
|
|
// Start the Watcher.
|
|
func (w *Watcher) Start() {
|
|
w.setMetrics()
|
|
level.Info(w.logger).Log("msg", "Starting WAL watcher", "queue", w.name)
|
|
|
|
go w.loop()
|
|
}
|
|
|
|
// Stop the Watcher.
|
|
func (w *Watcher) Stop() {
|
|
close(w.quit)
|
|
<-w.done
|
|
|
|
// Records read metric has series and samples.
|
|
if w.metrics != nil {
|
|
w.metrics.recordsRead.DeleteLabelValues(w.name, "series")
|
|
w.metrics.recordsRead.DeleteLabelValues(w.name, "samples")
|
|
w.metrics.recordDecodeFails.DeleteLabelValues(w.name)
|
|
w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name)
|
|
w.metrics.currentSegment.DeleteLabelValues(w.name)
|
|
}
|
|
|
|
level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name)
|
|
}
|
|
|
|
func (w *Watcher) loop() {
|
|
defer close(w.done)
|
|
|
|
// We may encounter failures processing the WAL; we should wait and retry.
|
|
for !isClosed(w.quit) {
|
|
w.SetStartTime(time.Now())
|
|
if err := w.Run(); err != nil {
|
|
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
|
|
}
|
|
|
|
select {
|
|
case <-w.quit:
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run the watcher, which will tail the WAL until the quit channel is closed
|
|
// or an error case is hit.
|
|
func (w *Watcher) Run() error {
|
|
_, lastSegment, err := Segments(w.walDir)
|
|
if err != nil {
|
|
return fmt.Errorf("Segments: %w", err)
|
|
}
|
|
|
|
// We want to ensure this is false across iterations since
|
|
// Run will be called again if there was a failure to read the WAL.
|
|
w.sendSamples = false
|
|
|
|
level.Info(w.logger).Log("msg", "Replaying WAL", "queue", w.name)
|
|
|
|
// Backfill from the checkpoint first if it exists.
|
|
lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir)
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) {
|
|
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
|
|
}
|
|
|
|
if err == nil {
|
|
if err = w.readCheckpoint(lastCheckpoint, (*Watcher).readSegment); err != nil {
|
|
return fmt.Errorf("readCheckpoint: %w", err)
|
|
}
|
|
}
|
|
w.lastCheckpoint = lastCheckpoint
|
|
|
|
currentSegment, err := w.findSegmentForIndex(checkpointIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "Tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment)
|
|
for !isClosed(w.quit) {
|
|
w.currentSegmentMetric.Set(float64(currentSegment))
|
|
|
|
// On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment.
|
|
// On subsequent calls to this function, currentSegment will have been incremented and we should open that segment.
|
|
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment)
|
|
if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil && !errors.Is(err, ErrIgnorable) {
|
|
return err
|
|
}
|
|
|
|
// For testing: stop when you hit a specific segment.
|
|
if currentSegment == w.MaxSegment {
|
|
return nil
|
|
}
|
|
|
|
currentSegment++
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// findSegmentForIndex finds the first segment greater than or equal to index.
|
|
func (w *Watcher) findSegmentForIndex(index int) (int, error) {
|
|
refs, err := listSegments(w.walDir)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
for _, r := range refs {
|
|
if r.index >= index {
|
|
return r.index, nil
|
|
}
|
|
}
|
|
|
|
return -1, errors.New("failed to find segment for index")
|
|
}
|
|
|
|
func (w *Watcher) readAndHandleError(r *LiveReader, segmentNum int, tail bool, size int64) error {
|
|
err := w.readSegment(r, segmentNum, tail)
|
|
|
|
// Ignore all errors reading to end of segment whilst replaying the WAL.
|
|
if !tail {
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
level.Warn(w.logger).Log("msg", "Ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err)
|
|
} else if r.Offset() != size {
|
|
level.Warn(w.logger).Log("msg", "Expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", r.Offset(), "size", size)
|
|
}
|
|
return ErrIgnorable
|
|
}
|
|
|
|
// Otherwise, when we are tailing, non-EOFs are fatal.
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Use tail true to indicate that the reader is currently on a segment that is
|
|
// actively being written to. If false, assume it's a full segment and we're
|
|
// replaying it on start to cache the series records.
|
|
func (w *Watcher) watch(segmentNum int, tail bool) error {
|
|
segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer segment.Close()
|
|
|
|
reader := NewLiveReader(w.logger, w.readerMetrics, segment)
|
|
|
|
size := int64(math.MaxInt64)
|
|
if !tail {
|
|
var err error
|
|
size, err = getSegmentSize(w.walDir, segmentNum)
|
|
if err != nil {
|
|
return fmt.Errorf("getSegmentSize: %w", err)
|
|
}
|
|
|
|
return w.readAndHandleError(reader, segmentNum, tail, size)
|
|
}
|
|
|
|
checkpointTicker := time.NewTicker(checkpointPeriod)
|
|
defer checkpointTicker.Stop()
|
|
|
|
segmentTicker := time.NewTicker(segmentCheckPeriod)
|
|
defer segmentTicker.Stop()
|
|
|
|
readTicker := time.NewTicker(readTimeout)
|
|
defer readTicker.Stop()
|
|
|
|
gcSem := make(chan struct{}, 1)
|
|
for {
|
|
select {
|
|
case <-w.quit:
|
|
return nil
|
|
|
|
case <-checkpointTicker.C:
|
|
// Periodically check if there is a new checkpoint so we can garbage
|
|
// collect labels. As this is considered an optimisation, we ignore
|
|
// errors during checkpoint processing. Doing the process asynchronously
|
|
// allows the current WAL segment to be processed while reading the
|
|
// checkpoint.
|
|
select {
|
|
case gcSem <- struct{}{}:
|
|
go func() {
|
|
defer func() {
|
|
<-gcSem
|
|
}()
|
|
if err := w.garbageCollectSeries(segmentNum); err != nil {
|
|
level.Warn(w.logger).Log("msg", "Error process checkpoint", "err", err)
|
|
}
|
|
}()
|
|
default:
|
|
// Currently doing a garbage collect, try again later.
|
|
}
|
|
|
|
// if a newer segment is produced, read the current one until the end and move on.
|
|
case <-segmentTicker.C:
|
|
_, last, err := Segments(w.walDir)
|
|
if err != nil {
|
|
return fmt.Errorf("Segments: %w", err)
|
|
}
|
|
|
|
if last > segmentNum {
|
|
return w.readAndHandleError(reader, segmentNum, tail, size)
|
|
}
|
|
continue
|
|
|
|
// we haven't read due to a notification in quite some time, try reading anyways
|
|
case <-readTicker.C:
|
|
level.Debug(w.logger).Log("msg", "Watcher is reading the WAL due to timeout, haven't received any write notifications recently", "timeout", readTimeout)
|
|
err := w.readAndHandleError(reader, segmentNum, tail, size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// reset the ticker so we don't read too often
|
|
readTicker.Reset(readTimeout)
|
|
|
|
case <-w.readNotify:
|
|
err := w.readAndHandleError(reader, segmentNum, tail, size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// reset the ticker so we don't read too often
|
|
readTicker.Reset(readTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Watcher) garbageCollectSeries(segmentNum int) error {
|
|
dir, _, err := LastCheckpoint(w.walDir)
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) {
|
|
return fmt.Errorf("tsdb.LastCheckpoint: %w", err)
|
|
}
|
|
|
|
if dir == "" || dir == w.lastCheckpoint {
|
|
return nil
|
|
}
|
|
w.lastCheckpoint = dir
|
|
|
|
index, err := checkpointNum(dir)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing checkpoint filename: %w", err)
|
|
}
|
|
|
|
if index >= segmentNum {
|
|
level.Debug(w.logger).Log("msg", "Current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir)
|
|
return nil
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "New checkpoint detected", "new", dir, "currentSegment", segmentNum)
|
|
|
|
if err = w.readCheckpoint(dir, (*Watcher).readSegmentForGC); err != nil {
|
|
return fmt.Errorf("readCheckpoint: %w", err)
|
|
}
|
|
|
|
// Clear series with a checkpoint or segment index # lower than the checkpoint we just read.
|
|
w.writer.SeriesReset(index)
|
|
return nil
|
|
}
|
|
|
|
// Read from a segment and pass the details to w.writer.
|
|
// Also used with readCheckpoint - implements segmentReadFn.
|
|
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
|
|
var (
|
|
dec = record.NewDecoder(labels.NewSymbolTable()) // One table per WAL segment means it won't grow indefinitely.
|
|
series []record.RefSeries
|
|
samples []record.RefSample
|
|
samplesToSend []record.RefSample
|
|
exemplars []record.RefExemplar
|
|
histograms []record.RefHistogramSample
|
|
histogramsToSend []record.RefHistogramSample
|
|
floatHistograms []record.RefFloatHistogramSample
|
|
floatHistogramsToSend []record.RefFloatHistogramSample
|
|
metadata []record.RefMetadata
|
|
)
|
|
for r.Next() && !isClosed(w.quit) {
|
|
rec := r.Record()
|
|
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
|
|
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series, err := dec.Series(rec, series[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.StoreSeries(series, segmentNum)
|
|
|
|
case record.Samples:
|
|
// If we're not tailing a segment we can ignore any samples records we see.
|
|
// This speeds up replay of the WAL by > 10x.
|
|
if !tail {
|
|
break
|
|
}
|
|
samples, err := dec.Samples(rec, samples[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
for _, s := range samples {
|
|
if s.T > w.startTimestamp {
|
|
if !w.sendSamples {
|
|
w.sendSamples = true
|
|
duration := time.Since(w.startTime)
|
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
|
}
|
|
samplesToSend = append(samplesToSend, s)
|
|
}
|
|
}
|
|
if len(samplesToSend) > 0 {
|
|
w.writer.Append(samplesToSend)
|
|
samplesToSend = samplesToSend[:0]
|
|
}
|
|
|
|
case record.Exemplars:
|
|
// Skip if experimental "exemplars over remote write" is not enabled.
|
|
if !w.sendExemplars {
|
|
break
|
|
}
|
|
// If we're not tailing a segment we can ignore any exemplars records we see.
|
|
// This speeds up replay of the WAL significantly.
|
|
if !tail {
|
|
break
|
|
}
|
|
exemplars, err := dec.Exemplars(rec, exemplars[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.AppendExemplars(exemplars)
|
|
|
|
case record.HistogramSamples:
|
|
// Skip if experimental "histograms over remote write" is not enabled.
|
|
if !w.sendHistograms {
|
|
break
|
|
}
|
|
if !tail {
|
|
break
|
|
}
|
|
histograms, err := dec.HistogramSamples(rec, histograms[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
for _, h := range histograms {
|
|
if h.T > w.startTimestamp {
|
|
if !w.sendSamples {
|
|
w.sendSamples = true
|
|
duration := time.Since(w.startTime)
|
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
|
}
|
|
histogramsToSend = append(histogramsToSend, h)
|
|
}
|
|
}
|
|
if len(histogramsToSend) > 0 {
|
|
w.writer.AppendHistograms(histogramsToSend)
|
|
histogramsToSend = histogramsToSend[:0]
|
|
}
|
|
case record.FloatHistogramSamples:
|
|
// Skip if experimental "histograms over remote write" is not enabled.
|
|
if !w.sendHistograms {
|
|
break
|
|
}
|
|
if !tail {
|
|
break
|
|
}
|
|
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
for _, fh := range floatHistograms {
|
|
if fh.T > w.startTimestamp {
|
|
if !w.sendSamples {
|
|
w.sendSamples = true
|
|
duration := time.Since(w.startTime)
|
|
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
|
|
}
|
|
floatHistogramsToSend = append(floatHistogramsToSend, fh)
|
|
}
|
|
}
|
|
if len(floatHistogramsToSend) > 0 {
|
|
w.writer.AppendFloatHistograms(floatHistogramsToSend)
|
|
floatHistogramsToSend = floatHistogramsToSend[:0]
|
|
}
|
|
|
|
case record.Metadata:
|
|
if !w.sendMetadata || !tail {
|
|
break
|
|
}
|
|
meta, err := dec.Metadata(rec, metadata[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.StoreMetadata(meta)
|
|
case record.Tombstones:
|
|
|
|
default:
|
|
// Could be corruption, or reading from a WAL from a newer Prometheus.
|
|
w.recordDecodeFailsMetric.Inc()
|
|
}
|
|
}
|
|
if err := r.Err(); err != nil {
|
|
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Go through all series in a segment updating the segmentNum, so we can delete older series.
|
|
// Used with readCheckpoint - implements segmentReadFn.
|
|
func (w *Watcher) readSegmentForGC(r *LiveReader, segmentNum int, _ bool) error {
|
|
var (
|
|
dec = record.NewDecoder(labels.NewSymbolTable()) // Needed for decoding; labels do not outlive this function.
|
|
series []record.RefSeries
|
|
)
|
|
for r.Next() && !isClosed(w.quit) {
|
|
rec := r.Record()
|
|
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
|
|
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series, err := dec.Series(rec, series[:0])
|
|
if err != nil {
|
|
w.recordDecodeFailsMetric.Inc()
|
|
return err
|
|
}
|
|
w.writer.UpdateSeriesSegment(series, segmentNum)
|
|
|
|
// Ignore these; we're only interested in series.
|
|
case record.Samples:
|
|
case record.Exemplars:
|
|
case record.Tombstones:
|
|
|
|
default:
|
|
// Could be corruption, or reading from a WAL from a newer Prometheus.
|
|
w.recordDecodeFailsMetric.Inc()
|
|
}
|
|
}
|
|
if err := r.Err(); err != nil {
|
|
return fmt.Errorf("segment %d: %w", segmentNum, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Watcher) SetStartTime(t time.Time) {
|
|
w.startTime = t
|
|
w.startTimestamp = timestamp.FromTime(t)
|
|
}
|
|
|
|
type segmentReadFn func(w *Watcher, r *LiveReader, segmentNum int, tail bool) error
|
|
|
|
// Read all the series records from a Checkpoint directory.
|
|
func (w *Watcher) readCheckpoint(checkpointDir string, readFn segmentReadFn) error {
|
|
level.Debug(w.logger).Log("msg", "Reading checkpoint", "dir", checkpointDir)
|
|
index, err := checkpointNum(checkpointDir)
|
|
if err != nil {
|
|
return fmt.Errorf("checkpointNum: %w", err)
|
|
}
|
|
|
|
// Ensure we read the whole contents of every segment in the checkpoint dir.
|
|
segs, err := listSegments(checkpointDir)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to get segments checkpoint dir: %w", err)
|
|
}
|
|
for _, segRef := range segs {
|
|
size, err := getSegmentSize(checkpointDir, segRef.index)
|
|
if err != nil {
|
|
return fmt.Errorf("getSegmentSize: %w", err)
|
|
}
|
|
|
|
sr, err := OpenReadSegment(SegmentName(checkpointDir, segRef.index))
|
|
if err != nil {
|
|
return fmt.Errorf("unable to open segment: %w", err)
|
|
}
|
|
defer sr.Close()
|
|
|
|
r := NewLiveReader(w.logger, w.readerMetrics, sr)
|
|
if err := readFn(w, r, index, false); err != nil && !errors.Is(err, io.EOF) {
|
|
return fmt.Errorf("readSegment: %w", err)
|
|
}
|
|
|
|
if r.Offset() != size {
|
|
return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, segRef.index, size, r.Offset())
|
|
}
|
|
}
|
|
|
|
level.Debug(w.logger).Log("msg", "Read series references from checkpoint", "checkpoint", checkpointDir)
|
|
return nil
|
|
}
|
|
|
|
func checkpointNum(dir string) (int, error) {
|
|
// Checkpoint dir names are in the format checkpoint.000001
|
|
// dir may contain a hidden directory, so only check the base directory
|
|
chunks := strings.Split(filepath.Base(dir), ".")
|
|
if len(chunks) != 2 {
|
|
return 0, fmt.Errorf("invalid checkpoint dir string: %s", dir)
|
|
}
|
|
|
|
result, err := strconv.Atoi(chunks[1])
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid checkpoint dir string: %s", dir)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Get size of segment.
|
|
func getSegmentSize(dir string, index int) (int64, error) {
|
|
i := int64(-1)
|
|
fi, err := os.Stat(SegmentName(dir, index))
|
|
if err == nil {
|
|
i = fi.Size()
|
|
}
|
|
return i, err
|
|
}
|
|
|
|
func isClosed(c chan struct{}) bool {
|
|
select {
|
|
case <-c:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|