mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-10 05:17:28 -08:00
d1122e0743
* Append metadata to the WAL Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Remove extra whitespace; Reword some docstrings and comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use RLock() for hasNewMetadata check Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use single byte for metric type in RefMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Update proposed WAL format for single-byte type metadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Implementa MetadataAppender interface for the Agent Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Address first round of review comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Amend description of metadata in wal.md Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Correct key used to retrieve metadata from cache When we're setting metadata entries in the scrapeCace, we're using the p.Help(), p.Unit(), p.Type() helpers, which retrieve the series name and use it as the cache key. When checking for cache entries though, we used p.Series() as the key, which included the metric name _with_ its labels. That meant that we were never actually hitting the cache. We're fixing this by utiling the __name__ internal label for correctly getting the cache entries after they've been set by setHelp(), setType() or setUnit(). Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Put feature behind a feature flag Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix AppendMetadata docstring Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Reorder WAL format document Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Change error message of AppendMetadata; Fix access of s.meta in AppendMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Reuse temporary buffer in Metadata encoder Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Only keep latest metadata for each refID during checkpointing Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix test that's referencing decoding metadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Avoid creating metadata block if no new metadata are present Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add tests for corrupt metadata block and relevant record type Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix CR comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Extract logic about changing metadata in an anonymous function Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Implement new proposed WAL format and amend relevant tests Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use 'const' for metadata field names Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Apply metadata to head memSeries in Commit, not in AppendMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add docstring and rename extracted helper in scrape.go Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add tests for tsdb-related cases Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix linter issues vol1 Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix linter issues vol2 Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix Windows test by closing WAL reader files Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Use switch instead of two if statements in metadata decoding Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix review comments around TestMetadata* tests Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Add code for replaying WAL; test correctness of in-memory data after a replay Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Remove scrape-loop related code from PR Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Address first round of comments Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Simplify tests by sorting slices before comparison Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix test to use separate transactions Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Empty out buffer and record slices after encoding latest metadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix linting issue Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Update calculation for DroppedMetadata metric Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Rename MetadataAppender interface and AppendMetadata method to MetadataUpdater/UpdateMetadata Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Reuse buffer when encoding latest metadata for each series Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Fix review comments; Check all returned error values using two helpers Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Simplify use of helpers Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com> * Satisfy linter Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
874 lines
25 KiB
Go
874 lines
25 KiB
Go
// Copyright 2021 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/metadata"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/storage/remote"
|
|
"github.com/prometheus/prometheus/tsdb"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/tsdb/record"
|
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
|
"github.com/prometheus/prometheus/tsdb/wal"
|
|
)
|
|
|
|
var ErrUnsupported = errors.New("unsupported operation with WAL-only storage")
|
|
|
|
// Default values for options.
|
|
var (
|
|
DefaultTruncateFrequency = 2 * time.Hour
|
|
DefaultMinWALTime = int64(5 * time.Minute / time.Millisecond)
|
|
DefaultMaxWALTime = int64(4 * time.Hour / time.Millisecond)
|
|
)
|
|
|
|
// Options of the WAL storage.
|
|
type Options struct {
|
|
// Segments (wal files) max size.
|
|
// WALSegmentSize <= 0, segment size is default size.
|
|
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
|
WALSegmentSize int
|
|
|
|
// WALCompression will turn on Snappy compression for records on the WAL.
|
|
WALCompression bool
|
|
|
|
// StripeSize is the size (power of 2) in entries of the series hash map. Reducing the size will save memory but impact performance.
|
|
StripeSize int
|
|
|
|
// TruncateFrequency determines how frequently to truncate data from the WAL.
|
|
TruncateFrequency time.Duration
|
|
|
|
// Shortest and longest amount of time data can exist in the WAL before being
|
|
// deleted.
|
|
MinWALTime, MaxWALTime int64
|
|
|
|
// NoLockfile disables creation and consideration of a lock file.
|
|
NoLockfile bool
|
|
}
|
|
|
|
// DefaultOptions used for the WAL storage. They are sane for setups using
|
|
// millisecond-precision timestamps.
|
|
func DefaultOptions() *Options {
|
|
return &Options{
|
|
WALSegmentSize: wal.DefaultSegmentSize,
|
|
WALCompression: false,
|
|
StripeSize: tsdb.DefaultStripeSize,
|
|
TruncateFrequency: DefaultTruncateFrequency,
|
|
MinWALTime: DefaultMinWALTime,
|
|
MaxWALTime: DefaultMaxWALTime,
|
|
NoLockfile: false,
|
|
}
|
|
}
|
|
|
|
type dbMetrics struct {
|
|
r prometheus.Registerer
|
|
|
|
numActiveSeries prometheus.Gauge
|
|
numWALSeriesPendingDeletion prometheus.Gauge
|
|
totalAppendedSamples prometheus.Counter
|
|
totalAppendedExemplars prometheus.Counter
|
|
totalOutOfOrderSamples prometheus.Counter
|
|
walTruncateDuration prometheus.Summary
|
|
walCorruptionsTotal prometheus.Counter
|
|
walTotalReplayDuration prometheus.Gauge
|
|
checkpointDeleteFail prometheus.Counter
|
|
checkpointDeleteTotal prometheus.Counter
|
|
checkpointCreationFail prometheus.Counter
|
|
checkpointCreationTotal prometheus.Counter
|
|
}
|
|
|
|
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
|
m := dbMetrics{r: r}
|
|
m.numActiveSeries = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "prometheus_agent_active_series",
|
|
Help: "Number of active series being tracked by the WAL storage",
|
|
})
|
|
|
|
m.numWALSeriesPendingDeletion = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "prometheus_agent_deleted_series",
|
|
Help: "Number of series pending deletion from the WAL",
|
|
})
|
|
|
|
m.totalAppendedSamples = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_samples_appended_total",
|
|
Help: "Total number of samples appended to the storage",
|
|
})
|
|
|
|
m.totalAppendedExemplars = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_exemplars_appended_total",
|
|
Help: "Total number of exemplars appended to the storage",
|
|
})
|
|
|
|
m.totalOutOfOrderSamples = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_out_of_order_samples_total",
|
|
Help: "Total number of out of order samples ingestion failed attempts.",
|
|
})
|
|
|
|
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
|
Name: "prometheus_agent_truncate_duration_seconds",
|
|
Help: "Duration of WAL truncation.",
|
|
})
|
|
|
|
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_corruptions_total",
|
|
Help: "Total number of WAL corruptions.",
|
|
})
|
|
|
|
m.walTotalReplayDuration = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "prometheus_agent_data_replay_duration_seconds",
|
|
Help: "Time taken to replay the data on disk.",
|
|
})
|
|
|
|
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_checkpoint_deletions_failed_total",
|
|
Help: "Total number of checkpoint deletions that failed.",
|
|
})
|
|
|
|
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_checkpoint_deletions_total",
|
|
Help: "Total number of checkpoint deletions attempted.",
|
|
})
|
|
|
|
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_checkpoint_creations_failed_total",
|
|
Help: "Total number of checkpoint creations that failed.",
|
|
})
|
|
|
|
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "prometheus_agent_checkpoint_creations_total",
|
|
Help: "Total number of checkpoint creations attempted.",
|
|
})
|
|
|
|
if r != nil {
|
|
r.MustRegister(
|
|
m.numActiveSeries,
|
|
m.numWALSeriesPendingDeletion,
|
|
m.totalAppendedSamples,
|
|
m.totalAppendedExemplars,
|
|
m.totalOutOfOrderSamples,
|
|
m.walTruncateDuration,
|
|
m.walCorruptionsTotal,
|
|
m.walTotalReplayDuration,
|
|
m.checkpointDeleteFail,
|
|
m.checkpointDeleteTotal,
|
|
m.checkpointCreationFail,
|
|
m.checkpointCreationTotal,
|
|
)
|
|
}
|
|
|
|
return &m
|
|
}
|
|
|
|
func (m *dbMetrics) Unregister() {
|
|
if m.r == nil {
|
|
return
|
|
}
|
|
cs := []prometheus.Collector{
|
|
m.numActiveSeries,
|
|
m.numWALSeriesPendingDeletion,
|
|
m.totalAppendedSamples,
|
|
m.totalAppendedExemplars,
|
|
m.totalOutOfOrderSamples,
|
|
m.walTruncateDuration,
|
|
m.walCorruptionsTotal,
|
|
m.walTotalReplayDuration,
|
|
m.checkpointDeleteFail,
|
|
m.checkpointDeleteTotal,
|
|
m.checkpointCreationFail,
|
|
m.checkpointCreationTotal,
|
|
}
|
|
for _, c := range cs {
|
|
m.r.Unregister(c)
|
|
}
|
|
}
|
|
|
|
// DB represents a WAL-only storage. It implements storage.DB.
|
|
type DB struct {
|
|
mtx sync.RWMutex
|
|
logger log.Logger
|
|
opts *Options
|
|
rs *remote.Storage
|
|
|
|
wal *wal.WAL
|
|
locker *tsdbutil.DirLocker
|
|
|
|
appenderPool sync.Pool
|
|
bufPool sync.Pool
|
|
|
|
nextRef *atomic.Uint64
|
|
series *stripeSeries
|
|
// deleted is a map of (ref IDs that should be deleted from WAL) to (the WAL segment they
|
|
// must be kept around to).
|
|
deleted map[chunks.HeadSeriesRef]int
|
|
|
|
donec chan struct{}
|
|
stopc chan struct{}
|
|
|
|
metrics *dbMetrics
|
|
}
|
|
|
|
// Open returns a new agent.DB in the given directory.
|
|
func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir string, opts *Options) (*DB, error) {
|
|
opts = validateOptions(opts)
|
|
|
|
locker, err := tsdbutil.NewDirLocker(dir, "agent", l, reg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !opts.NoLockfile {
|
|
if err := locker.Lock(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// remote_write expects WAL to be stored in a "wal" subdirectory of the main storage.
|
|
dir = filepath.Join(dir, "wal")
|
|
|
|
w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "creating WAL")
|
|
}
|
|
|
|
db := &DB{
|
|
logger: l,
|
|
opts: opts,
|
|
rs: rs,
|
|
|
|
wal: w,
|
|
locker: locker,
|
|
|
|
nextRef: atomic.NewUint64(0),
|
|
series: newStripeSeries(opts.StripeSize),
|
|
deleted: make(map[chunks.HeadSeriesRef]int),
|
|
|
|
donec: make(chan struct{}),
|
|
stopc: make(chan struct{}),
|
|
|
|
metrics: newDBMetrics(reg),
|
|
}
|
|
|
|
db.bufPool.New = func() interface{} {
|
|
return make([]byte, 0, 1024)
|
|
}
|
|
|
|
db.appenderPool.New = func() interface{} {
|
|
return &appender{
|
|
DB: db,
|
|
pendingSeries: make([]record.RefSeries, 0, 100),
|
|
pendingSamples: make([]record.RefSample, 0, 100),
|
|
pendingExamplars: make([]record.RefExemplar, 0, 10),
|
|
}
|
|
}
|
|
|
|
if err := db.replayWAL(); err != nil {
|
|
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
|
|
if err := w.Repair(err); err != nil {
|
|
return nil, errors.Wrap(err, "repair corrupted WAL")
|
|
}
|
|
}
|
|
|
|
go db.run()
|
|
return db, nil
|
|
}
|
|
|
|
func validateOptions(opts *Options) *Options {
|
|
if opts == nil {
|
|
opts = DefaultOptions()
|
|
}
|
|
if opts.WALSegmentSize <= 0 {
|
|
opts.WALSegmentSize = wal.DefaultSegmentSize
|
|
}
|
|
|
|
// Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2.
|
|
if opts.StripeSize <= 0 || ((opts.StripeSize & (opts.StripeSize - 1)) != 0) {
|
|
opts.StripeSize = tsdb.DefaultStripeSize
|
|
}
|
|
if opts.TruncateFrequency <= 0 {
|
|
opts.TruncateFrequency = DefaultTruncateFrequency
|
|
}
|
|
if opts.MinWALTime <= 0 {
|
|
opts.MinWALTime = 0
|
|
}
|
|
if opts.MaxWALTime <= 0 {
|
|
opts.MaxWALTime = DefaultMaxWALTime
|
|
}
|
|
|
|
if t := int64(opts.TruncateFrequency * time.Hour / time.Millisecond); opts.MaxWALTime < t {
|
|
opts.MaxWALTime = t
|
|
}
|
|
return opts
|
|
}
|
|
|
|
func (db *DB) replayWAL() error {
|
|
level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir())
|
|
start := time.Now()
|
|
|
|
dir, startFrom, err := wal.LastCheckpoint(db.wal.Dir())
|
|
if err != nil && err != record.ErrNotFound {
|
|
return errors.Wrap(err, "find last checkpoint")
|
|
}
|
|
|
|
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
|
|
|
|
if err == nil {
|
|
sr, err := wal.NewSegmentsReader(dir)
|
|
if err != nil {
|
|
return errors.Wrap(err, "open checkpoint")
|
|
}
|
|
defer func() {
|
|
if err := sr.Close(); err != nil {
|
|
level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
|
}
|
|
}()
|
|
|
|
// A corrupted checkpoint is a hard error for now and requires user
|
|
// intervention. There's likely little data that can be recovered anyway.
|
|
if err := db.loadWAL(wal.NewReader(sr), multiRef); err != nil {
|
|
return errors.Wrap(err, "backfill checkpoint")
|
|
}
|
|
startFrom++
|
|
level.Info(db.logger).Log("msg", "WAL checkpoint loaded")
|
|
}
|
|
|
|
// Find the last segment.
|
|
_, last, err := wal.Segments(db.wal.Dir())
|
|
if err != nil {
|
|
return errors.Wrap(err, "finding WAL segments")
|
|
}
|
|
|
|
// Backfil segments from the most recent checkpoint onwards.
|
|
for i := startFrom; i <= last; i++ {
|
|
seg, err := wal.OpenReadSegment(wal.SegmentName(db.wal.Dir(), i))
|
|
if err != nil {
|
|
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
|
|
}
|
|
|
|
sr := wal.NewSegmentBufReader(seg)
|
|
err = db.loadWAL(wal.NewReader(sr), multiRef)
|
|
if err := sr.Close(); err != nil {
|
|
level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
level.Info(db.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last)
|
|
}
|
|
|
|
walReplayDuration := time.Since(start)
|
|
db.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
|
|
var (
|
|
dec record.Decoder
|
|
lastRef = chunks.HeadSeriesRef(db.nextRef.Load())
|
|
|
|
decoded = make(chan interface{}, 10)
|
|
errCh = make(chan error, 1)
|
|
seriesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefSeries{}
|
|
},
|
|
}
|
|
samplesPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return []record.RefSample{}
|
|
},
|
|
}
|
|
)
|
|
|
|
go func() {
|
|
defer close(decoded)
|
|
var err error
|
|
for r.Next() {
|
|
rec := r.Record()
|
|
switch dec.Type(rec) {
|
|
case record.Series:
|
|
series := seriesPool.Get().([]record.RefSeries)[:0]
|
|
series, err = dec.Series(rec, series)
|
|
if err != nil {
|
|
errCh <- &wal.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode series"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- series
|
|
case record.Samples:
|
|
samples := samplesPool.Get().([]record.RefSample)[:0]
|
|
samples, err = dec.Samples(rec, samples)
|
|
if err != nil {
|
|
errCh <- &wal.CorruptionErr{
|
|
Err: errors.Wrap(err, "decode samples"),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
return
|
|
}
|
|
decoded <- samples
|
|
case record.Tombstones, record.Exemplars:
|
|
// We don't care about tombstones or exemplars during replay.
|
|
// TODO: If decide to decode exemplars, we should make sure to prepopulate
|
|
// stripeSeries.exemplars in the next block by using setLatestExemplar.
|
|
continue
|
|
default:
|
|
errCh <- &wal.CorruptionErr{
|
|
Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
|
|
Segment: r.Segment(),
|
|
Offset: r.Offset(),
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
var nonExistentSeriesRefs atomic.Uint64
|
|
|
|
for d := range decoded {
|
|
switch v := d.(type) {
|
|
case []record.RefSeries:
|
|
for _, entry := range v {
|
|
// If this is a new series, create it in memory. If we never read in a
|
|
// sample for this series, its timestamp will remain at 0 and it will
|
|
// be deleted at the next GC.
|
|
if db.series.GetByID(entry.Ref) == nil {
|
|
series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0}
|
|
db.series.Set(entry.Labels.Hash(), series)
|
|
multiRef[entry.Ref] = series.ref
|
|
db.metrics.numActiveSeries.Inc()
|
|
if entry.Ref > lastRef {
|
|
lastRef = entry.Ref
|
|
}
|
|
}
|
|
}
|
|
|
|
//nolint:staticcheck
|
|
seriesPool.Put(v)
|
|
case []record.RefSample:
|
|
for _, entry := range v {
|
|
// Update the lastTs for the series based
|
|
ref, ok := multiRef[entry.Ref]
|
|
if !ok {
|
|
nonExistentSeriesRefs.Inc()
|
|
continue
|
|
}
|
|
series := db.series.GetByID(ref)
|
|
if entry.T > series.lastTs {
|
|
series.lastTs = entry.T
|
|
}
|
|
}
|
|
|
|
//nolint:staticcheck
|
|
samplesPool.Put(v)
|
|
default:
|
|
panic(fmt.Errorf("unexpected decoded type: %T", d))
|
|
}
|
|
}
|
|
|
|
if v := nonExistentSeriesRefs.Load(); v > 0 {
|
|
level.Warn(db.logger).Log("msg", "found sample referencing non-existing series", "skipped_series", v)
|
|
}
|
|
|
|
db.nextRef.Store(uint64(lastRef))
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
default:
|
|
if r.Err() != nil {
|
|
return errors.Wrap(r.Err(), "read records")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (db *DB) run() {
|
|
defer close(db.donec)
|
|
|
|
Loop:
|
|
for {
|
|
select {
|
|
case <-db.stopc:
|
|
break Loop
|
|
case <-time.After(db.opts.TruncateFrequency):
|
|
// The timestamp ts is used to determine which series are not receiving
|
|
// samples and may be deleted from the WAL. Their most recent append
|
|
// timestamp is compared to ts, and if that timestamp is older then ts,
|
|
// they are considered inactive and may be deleted.
|
|
//
|
|
// Subtracting a duration from ts will add a buffer for when series are
|
|
// considered inactive and safe for deletion.
|
|
ts := db.rs.LowestSentTimestamp() - db.opts.MinWALTime
|
|
if ts < 0 {
|
|
ts = 0
|
|
}
|
|
|
|
// Network issues can prevent the result of getRemoteWriteTimestamp from
|
|
// changing. We don't want data in the WAL to grow forever, so we set a cap
|
|
// on the maximum age data can be. If our ts is older than this cutoff point,
|
|
// we'll shift it forward to start deleting very stale data.
|
|
if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS {
|
|
ts = maxTS
|
|
}
|
|
|
|
level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts)
|
|
if err := db.truncate(ts); err != nil {
|
|
level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (db *DB) truncate(mint int64) error {
|
|
db.mtx.RLock()
|
|
defer db.mtx.RUnlock()
|
|
|
|
start := time.Now()
|
|
|
|
db.gc(mint)
|
|
level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start))
|
|
|
|
first, last, err := wal.Segments(db.wal.Dir())
|
|
if err != nil {
|
|
return errors.Wrap(err, "get segment range")
|
|
}
|
|
|
|
// Start a new segment so low ingestion volume instances don't have more WAL
|
|
// than needed.
|
|
err = db.wal.NextSegment()
|
|
if err != nil {
|
|
return errors.Wrap(err, "next segment")
|
|
}
|
|
|
|
last-- // Never consider most recent segment for checkpoint
|
|
if last < 0 {
|
|
return nil // no segments yet
|
|
}
|
|
|
|
// The lower two-thirds of segments should contain mostly obsolete samples.
|
|
// If we have less than two segments, it's not worth checkpointing yet.
|
|
last = first + (last-first)*2/3
|
|
if last <= first {
|
|
return nil
|
|
}
|
|
|
|
keep := func(id chunks.HeadSeriesRef) bool {
|
|
if db.series.GetByID(id) != nil {
|
|
return true
|
|
}
|
|
|
|
seg, ok := db.deleted[id]
|
|
return ok && seg >= first
|
|
}
|
|
|
|
db.metrics.checkpointCreationTotal.Inc()
|
|
|
|
if _, err = wal.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
|
|
db.metrics.checkpointCreationFail.Inc()
|
|
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok {
|
|
db.metrics.walCorruptionsTotal.Inc()
|
|
}
|
|
return errors.Wrap(err, "create checkpoint")
|
|
}
|
|
if err := db.wal.Truncate(last + 1); err != nil {
|
|
// If truncating fails, we'll just try it again at the next checkpoint.
|
|
// Leftover segments will still just be ignored in the future if there's a
|
|
// checkpoint that supersedes them.
|
|
level.Error(db.logger).Log("msg", "truncating segments failed", "err", err)
|
|
}
|
|
|
|
// The checkpoint is written and segments before it are truncated, so we
|
|
// no longer need to track deleted series that were being kept around.
|
|
for ref, segment := range db.deleted {
|
|
if segment < first {
|
|
delete(db.deleted, ref)
|
|
}
|
|
}
|
|
db.metrics.checkpointDeleteTotal.Inc()
|
|
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
|
|
|
|
if err := wal.DeleteCheckpoints(db.wal.Dir(), last); err != nil {
|
|
// Leftover old checkpoints do not cause problems down the line beyond
|
|
// occupying disk space. They will just be ignored since a newer checkpoint
|
|
// exists.
|
|
level.Error(db.logger).Log("msg", "delete old checkpoints", "err", err)
|
|
db.metrics.checkpointDeleteFail.Inc()
|
|
}
|
|
|
|
db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
|
|
|
|
level.Info(db.logger).Log("msg", "WAL checkpoint complete", "first", first, "last", last, "duration", time.Since(start))
|
|
return nil
|
|
}
|
|
|
|
// gc marks ref IDs that have not received a sample since mint as deleted in
|
|
// s.deleted, along with the segment where they originally got deleted.
|
|
func (db *DB) gc(mint int64) {
|
|
deleted := db.series.GC(mint)
|
|
db.metrics.numActiveSeries.Sub(float64(len(deleted)))
|
|
|
|
_, last, _ := wal.Segments(db.wal.Dir())
|
|
|
|
// We want to keep series records for any newly deleted series
|
|
// until we've passed the last recorded segment. This prevents
|
|
// the WAL having samples for series records that no longer exist.
|
|
for ref := range deleted {
|
|
db.deleted[ref] = last
|
|
}
|
|
|
|
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
|
|
}
|
|
|
|
// StartTime implements the Storage interface.
|
|
func (db *DB) StartTime() (int64, error) {
|
|
return int64(model.Latest), nil
|
|
}
|
|
|
|
// Querier implements the Storage interface.
|
|
func (db *DB) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
|
return nil, ErrUnsupported
|
|
}
|
|
|
|
// ChunkQuerier implements the Storage interface.
|
|
func (db *DB) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
|
return nil, ErrUnsupported
|
|
}
|
|
|
|
// ExemplarQuerier implements the Storage interface.
|
|
func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
|
|
return nil, ErrUnsupported
|
|
}
|
|
|
|
// Appender implements storage.Storage.
|
|
func (db *DB) Appender(_ context.Context) storage.Appender {
|
|
return db.appenderPool.Get().(storage.Appender)
|
|
}
|
|
|
|
// Close implements the Storage interface.
|
|
func (db *DB) Close() error {
|
|
db.mtx.Lock()
|
|
defer db.mtx.Unlock()
|
|
|
|
close(db.stopc)
|
|
<-db.donec
|
|
|
|
db.metrics.Unregister()
|
|
|
|
return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err()
|
|
}
|
|
|
|
type appender struct {
|
|
*DB
|
|
|
|
pendingSeries []record.RefSeries
|
|
pendingSamples []record.RefSample
|
|
pendingExamplars []record.RefExemplar
|
|
|
|
// Pointers to the series referenced by each element of pendingSamples.
|
|
// Series lock is not held on elements.
|
|
sampleSeries []*memSeries
|
|
}
|
|
|
|
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
|
// series references and chunk references are identical for agent mode.
|
|
headRef := chunks.HeadSeriesRef(ref)
|
|
|
|
series := a.series.GetByID(headRef)
|
|
if series == nil {
|
|
// Ensure no empty or duplicate labels have gotten through. This mirrors the
|
|
// equivalent validation code in the TSDB's headAppender.
|
|
l = l.WithoutEmpty()
|
|
if len(l) == 0 {
|
|
return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset")
|
|
}
|
|
|
|
if lbl, dup := l.HasDuplicateLabelNames(); dup {
|
|
return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl))
|
|
}
|
|
|
|
var created bool
|
|
series, created = a.getOrCreate(l)
|
|
if created {
|
|
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
|
|
Ref: series.ref,
|
|
Labels: l,
|
|
})
|
|
|
|
a.metrics.numActiveSeries.Inc()
|
|
}
|
|
}
|
|
|
|
series.Lock()
|
|
defer series.Unlock()
|
|
|
|
if t < series.lastTs {
|
|
a.metrics.totalOutOfOrderSamples.Inc()
|
|
return 0, storage.ErrOutOfOrderSample
|
|
}
|
|
|
|
// NOTE: always modify pendingSamples and sampleSeries together
|
|
a.pendingSamples = append(a.pendingSamples, record.RefSample{
|
|
Ref: series.ref,
|
|
T: t,
|
|
V: v,
|
|
})
|
|
a.sampleSeries = append(a.sampleSeries, series)
|
|
|
|
a.metrics.totalAppendedSamples.Inc()
|
|
return storage.SeriesRef(series.ref), nil
|
|
}
|
|
|
|
func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) {
|
|
hash := l.Hash()
|
|
|
|
series = a.series.GetByHash(hash, l)
|
|
if series != nil {
|
|
return series, false
|
|
}
|
|
|
|
ref := chunks.HeadSeriesRef(a.nextRef.Inc())
|
|
series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64}
|
|
a.series.Set(hash, series)
|
|
return series, true
|
|
}
|
|
|
|
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
|
// series references and chunk references are identical for agent mode.
|
|
headRef := chunks.HeadSeriesRef(ref)
|
|
|
|
s := a.series.GetByID(headRef)
|
|
if s == nil {
|
|
return 0, fmt.Errorf("unknown series ref when trying to add exemplar: %d", ref)
|
|
}
|
|
|
|
// Ensure no empty labels have gotten through.
|
|
e.Labels = e.Labels.WithoutEmpty()
|
|
|
|
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
|
|
return 0, errors.Wrap(tsdb.ErrInvalidExemplar, fmt.Sprintf(`label name "%s" is not unique`, lbl))
|
|
}
|
|
|
|
// Exemplar label length does not include chars involved in text rendering such as quotes
|
|
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
|
|
labelSetLen := 0
|
|
for _, l := range e.Labels {
|
|
labelSetLen += utf8.RuneCountInString(l.Name)
|
|
labelSetLen += utf8.RuneCountInString(l.Value)
|
|
|
|
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
|
|
return 0, storage.ErrExemplarLabelLength
|
|
}
|
|
}
|
|
|
|
// Check for duplicate vs last stored exemplar for this series, and discard those.
|
|
// Otherwise, record the current exemplar as the latest.
|
|
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
|
|
prevExemplar := a.series.GetLatestExemplar(s.ref)
|
|
if prevExemplar != nil && prevExemplar.Equals(e) {
|
|
// Duplicate, don't return an error but don't accept the exemplar.
|
|
return 0, nil
|
|
}
|
|
a.series.SetLatestExemplar(s.ref, &e)
|
|
|
|
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
|
|
Ref: s.ref,
|
|
T: e.Ts,
|
|
V: e.Value,
|
|
Labels: e.Labels,
|
|
})
|
|
|
|
a.metrics.totalAppendedExemplars.Inc()
|
|
return storage.SeriesRef(s.ref), nil
|
|
}
|
|
|
|
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
|
|
// TODO: Wire metadata in the Agent's appender.
|
|
return 0, nil
|
|
}
|
|
|
|
// Commit submits the collected samples and purges the batch.
|
|
func (a *appender) Commit() error {
|
|
a.mtx.RLock()
|
|
defer a.mtx.RUnlock()
|
|
|
|
var encoder record.Encoder
|
|
buf := a.bufPool.Get().([]byte)
|
|
|
|
if len(a.pendingSeries) > 0 {
|
|
buf = encoder.Series(a.pendingSeries, buf)
|
|
if err := a.wal.Log(buf); err != nil {
|
|
return err
|
|
}
|
|
buf = buf[:0]
|
|
}
|
|
|
|
if len(a.pendingSamples) > 0 {
|
|
buf = encoder.Samples(a.pendingSamples, buf)
|
|
if err := a.wal.Log(buf); err != nil {
|
|
return err
|
|
}
|
|
buf = buf[:0]
|
|
}
|
|
|
|
if len(a.pendingExamplars) > 0 {
|
|
buf = encoder.Exemplars(a.pendingExamplars, buf)
|
|
if err := a.wal.Log(buf); err != nil {
|
|
return err
|
|
}
|
|
buf = buf[:0]
|
|
}
|
|
|
|
var series *memSeries
|
|
for i, s := range a.pendingSamples {
|
|
series = a.sampleSeries[i]
|
|
if !series.updateTimestamp(s.T) {
|
|
a.metrics.totalOutOfOrderSamples.Inc()
|
|
}
|
|
}
|
|
|
|
//nolint:staticcheck
|
|
a.bufPool.Put(buf)
|
|
return a.Rollback()
|
|
}
|
|
|
|
func (a *appender) Rollback() error {
|
|
a.pendingSeries = a.pendingSeries[:0]
|
|
a.pendingSamples = a.pendingSamples[:0]
|
|
a.pendingExamplars = a.pendingExamplars[:0]
|
|
a.sampleSeries = a.sampleSeries[:0]
|
|
a.appenderPool.Put(a)
|
|
return nil
|
|
}
|