TSDB: Rename wal package to wlog (#11352)

The wlog.WL type can now be used to create a Write Ahead Log or a Write
Behind Log.

Before the prefix for wbl metrics was
'prometheus_tsdb_out_of_order_wal_' and has been replaced with
'prometheus_tsdb_out_of_order_wbl_'.

Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
Signed-off-by: Jesus Vazquez <jesusvazquez@users.noreply.github.com>
Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
Jesus Vazquez 2022-10-10 17:08:46 +02:00 committed by GitHub
parent bfd320e186
commit 775d90d5f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 251 additions and 251 deletions

View file

@ -38,7 +38,7 @@ import (
"github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
const ( const (
@ -348,7 +348,7 @@ type QueueManager struct {
externalLabels labels.Labels externalLabels labels.Labels
relabelConfigs []*relabel.Config relabelConfigs []*relabel.Config
sendExemplars bool sendExemplars bool
watcher *wal.Watcher watcher *wlog.Watcher
metadataWatcher *MetadataWatcher metadataWatcher *MetadataWatcher
clientMtx sync.RWMutex clientMtx sync.RWMutex
@ -381,8 +381,8 @@ type QueueManager struct {
// the WAL directory will be constructed as <dir>/wal. // the WAL directory will be constructed as <dir>/wal.
func NewQueueManager( func NewQueueManager(
metrics *queueManagerMetrics, metrics *queueManagerMetrics,
watcherMetrics *wal.WatcherMetrics, watcherMetrics *wlog.WatcherMetrics,
readerMetrics *wal.LiveReaderMetrics, readerMetrics *wlog.LiveReaderMetrics,
logger log.Logger, logger log.Logger,
dir string, dir string,
samplesIn *ewmaRate, samplesIn *ewmaRate,
@ -430,7 +430,7 @@ func NewQueueManager(
highestRecvTimestamp: highestRecvTimestamp, highestRecvTimestamp: highestRecvTimestamp,
} }
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite) t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite)
if t.mcfg.Send { if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
} }

View file

@ -786,7 +786,7 @@ func BenchmarkSampleSend(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
m.Append(samples) m.Append(samples)
m.UpdateSeriesSegment(series, i+1) // simulate what wal.Watcher.garbageCollectSeries does m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1) m.SeriesReset(i + 1)
} }
// Do not include shutdown // Do not include shutdown

View file

@ -29,7 +29,7 @@ import (
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
var ( var (
@ -53,8 +53,8 @@ type WriteStorage struct {
reg prometheus.Registerer reg prometheus.Registerer
mtx sync.Mutex mtx sync.Mutex
watcherMetrics *wal.WatcherMetrics watcherMetrics *wlog.WatcherMetrics
liveReaderMetrics *wal.LiveReaderMetrics liveReaderMetrics *wlog.LiveReaderMetrics
externalLabels labels.Labels externalLabels labels.Labels
dir string dir string
queues map[string]*QueueManager queues map[string]*QueueManager
@ -75,8 +75,8 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, f
} }
rws := &WriteStorage{ rws := &WriteStorage{
queues: make(map[string]*QueueManager), queues: make(map[string]*QueueManager),
watcherMetrics: wal.NewWatcherMetrics(reg), watcherMetrics: wlog.NewWatcherMetrics(reg),
liveReaderMetrics: wal.NewLiveReaderMetrics(reg), liveReaderMetrics: wlog.NewLiveReaderMetrics(reg),
logger: logger, logger: logger,
reg: reg, reg: reg,
flushDeadline: flushDeadline, flushDeadline: flushDeadline,

View file

@ -40,7 +40,7 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
var ErrUnsupported = errors.New("unsupported operation with WAL-only storage") var ErrUnsupported = errors.New("unsupported operation with WAL-only storage")
@ -80,7 +80,7 @@ type Options struct {
// millisecond-precision timestamps. // millisecond-precision timestamps.
func DefaultOptions() *Options { func DefaultOptions() *Options {
return &Options{ return &Options{
WALSegmentSize: wal.DefaultSegmentSize, WALSegmentSize: wlog.DefaultSegmentSize,
WALCompression: false, WALCompression: false,
StripeSize: tsdb.DefaultStripeSize, StripeSize: tsdb.DefaultStripeSize,
TruncateFrequency: DefaultTruncateFrequency, TruncateFrequency: DefaultTruncateFrequency,
@ -219,7 +219,7 @@ type DB struct {
opts *Options opts *Options
rs *remote.Storage rs *remote.Storage
wal *wal.WAL wal *wlog.WL
locker *tsdbutil.DirLocker locker *tsdbutil.DirLocker
appenderPool sync.Pool appenderPool sync.Pool
@ -254,7 +254,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
// remote_write expects WAL to be stored in a "wal" subdirectory of the main storage. // remote_write expects WAL to be stored in a "wal" subdirectory of the main storage.
dir = filepath.Join(dir, "wal") dir = filepath.Join(dir, "wal")
w, err := wal.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression) w, err := wlog.NewSize(l, reg, dir, opts.WALSegmentSize, opts.WALCompression)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "creating WAL") return nil, errors.Wrap(err, "creating WAL")
} }
@ -306,7 +306,7 @@ func validateOptions(opts *Options) *Options {
opts = DefaultOptions() opts = DefaultOptions()
} }
if opts.WALSegmentSize <= 0 { if opts.WALSegmentSize <= 0 {
opts.WALSegmentSize = wal.DefaultSegmentSize opts.WALSegmentSize = wlog.DefaultSegmentSize
} }
// Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2. // Revert Stripesize to DefaultStripsize if Stripsize is either 0 or not a power of 2.
@ -336,7 +336,7 @@ func (db *DB) replayWAL() error {
level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir()) level.Info(db.logger).Log("msg", "replaying WAL, this may take a while", "dir", db.wal.Dir())
start := time.Now() start := time.Now()
dir, startFrom, err := wal.LastCheckpoint(db.wal.Dir()) dir, startFrom, err := wlog.LastCheckpoint(db.wal.Dir())
if err != nil && err != record.ErrNotFound { if err != nil && err != record.ErrNotFound {
return errors.Wrap(err, "find last checkpoint") return errors.Wrap(err, "find last checkpoint")
} }
@ -344,7 +344,7 @@ func (db *DB) replayWAL() error {
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
if err == nil { if err == nil {
sr, err := wal.NewSegmentsReader(dir) sr, err := wlog.NewSegmentsReader(dir)
if err != nil { if err != nil {
return errors.Wrap(err, "open checkpoint") return errors.Wrap(err, "open checkpoint")
} }
@ -356,7 +356,7 @@ func (db *DB) replayWAL() error {
// A corrupted checkpoint is a hard error for now and requires user // A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway. // intervention. There's likely little data that can be recovered anyway.
if err := db.loadWAL(wal.NewReader(sr), multiRef); err != nil { if err := db.loadWAL(wlog.NewReader(sr), multiRef); err != nil {
return errors.Wrap(err, "backfill checkpoint") return errors.Wrap(err, "backfill checkpoint")
} }
startFrom++ startFrom++
@ -364,20 +364,20 @@ func (db *DB) replayWAL() error {
} }
// Find the last segment. // Find the last segment.
_, last, err := wal.Segments(db.wal.Dir()) _, last, err := wlog.Segments(db.wal.Dir())
if err != nil { if err != nil {
return errors.Wrap(err, "finding WAL segments") return errors.Wrap(err, "finding WAL segments")
} }
// Backfil segments from the most recent checkpoint onwards. // Backfil segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ { for i := startFrom; i <= last; i++ {
seg, err := wal.OpenReadSegment(wal.SegmentName(db.wal.Dir(), i)) seg, err := wlog.OpenReadSegment(wlog.SegmentName(db.wal.Dir(), i))
if err != nil { if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
} }
sr := wal.NewSegmentBufReader(seg) sr := wlog.NewSegmentBufReader(seg)
err = db.loadWAL(wal.NewReader(sr), multiRef) err = db.loadWAL(wlog.NewReader(sr), multiRef)
if err := sr.Close(); err != nil { if err := sr.Close(); err != nil {
level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err) level.Warn(db.logger).Log("msg", "error while closing the wal segments reader", "err", err)
} }
@ -393,7 +393,7 @@ func (db *DB) replayWAL() error {
return nil return nil
} }
func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) { func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
var ( var (
dec record.Decoder dec record.Decoder
lastRef = chunks.HeadSeriesRef(db.nextRef.Load()) lastRef = chunks.HeadSeriesRef(db.nextRef.Load())
@ -422,7 +422,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
series := seriesPool.Get().([]record.RefSeries)[:0] series := seriesPool.Get().([]record.RefSeries)[:0]
series, err = dec.Series(rec, series) series, err = dec.Series(rec, series)
if err != nil { if err != nil {
errCh <- &wal.CorruptionErr{ errCh <- &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode series"), Err: errors.Wrap(err, "decode series"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -434,7 +434,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
samples := samplesPool.Get().([]record.RefSample)[:0] samples := samplesPool.Get().([]record.RefSample)[:0]
samples, err = dec.Samples(rec, samples) samples, err = dec.Samples(rec, samples)
if err != nil { if err != nil {
errCh <- &wal.CorruptionErr{ errCh <- &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode samples"), Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -448,7 +448,7 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
// stripeSeries.exemplars in the next block by using setLatestExemplar. // stripeSeries.exemplars in the next block by using setLatestExemplar.
continue continue
default: default:
errCh <- &wal.CorruptionErr{ errCh <- &wlog.CorruptionErr{
Err: errors.Errorf("invalid record type %v", dec.Type(rec)), Err: errors.Errorf("invalid record type %v", dec.Type(rec)),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -563,7 +563,7 @@ func (db *DB) truncate(mint int64) error {
db.gc(mint) db.gc(mint)
level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start)) level.Info(db.logger).Log("msg", "series GC completed", "duration", time.Since(start))
first, last, err := wal.Segments(db.wal.Dir()) first, last, err := wlog.Segments(db.wal.Dir())
if err != nil { if err != nil {
return errors.Wrap(err, "get segment range") return errors.Wrap(err, "get segment range")
} }
@ -597,9 +597,9 @@ func (db *DB) truncate(mint int64) error {
db.metrics.checkpointCreationTotal.Inc() db.metrics.checkpointCreationTotal.Inc()
if _, err = wal.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil { if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, keep, mint); err != nil {
db.metrics.checkpointCreationFail.Inc() db.metrics.checkpointCreationFail.Inc()
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
db.metrics.walCorruptionsTotal.Inc() db.metrics.walCorruptionsTotal.Inc()
} }
return errors.Wrap(err, "create checkpoint") return errors.Wrap(err, "create checkpoint")
@ -621,7 +621,7 @@ func (db *DB) truncate(mint int64) error {
db.metrics.checkpointDeleteTotal.Inc() db.metrics.checkpointDeleteTotal.Inc()
db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted))) db.metrics.numWALSeriesPendingDeletion.Set(float64(len(db.deleted)))
if err := wal.DeleteCheckpoints(db.wal.Dir(), last); err != nil { if err := wlog.DeleteCheckpoints(db.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond // Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space. They will just be ignored since a newer checkpoint // occupying disk space. They will just be ignored since a newer checkpoint
// exists. // exists.
@ -641,7 +641,7 @@ func (db *DB) gc(mint int64) {
deleted := db.series.GC(mint) deleted := db.series.GC(mint)
db.metrics.numActiveSeries.Sub(float64(len(deleted))) db.metrics.numActiveSeries.Sub(float64(len(deleted)))
_, last, _ := wal.Segments(db.wal.Dir()) _, last, _ := wlog.Segments(db.wal.Dir())
// We want to keep series records for any newly deleted series // We want to keep series records for any newly deleted series
// until we've passed the last recorded segment. This prevents // until we've passed the last recorded segment. This prevents

View file

@ -35,7 +35,7 @@ import (
"github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -141,7 +141,7 @@ func TestCommit(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
sr, err := wal.NewSegmentsReader(s.wal.Dir()) sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, sr.Close()) require.NoError(t, sr.Close())
@ -149,7 +149,7 @@ func TestCommit(t *testing.T) {
// Read records from WAL and check for expected count of series, samples, and exemplars. // Read records from WAL and check for expected count of series, samples, and exemplars.
var ( var (
r = wal.NewReader(sr) r = wlog.NewReader(sr)
dec record.Decoder dec record.Decoder
walSeriesCount, walSamplesCount, walExemplarsCount int walSeriesCount, walSamplesCount, walExemplarsCount int
@ -211,7 +211,7 @@ func TestRollback(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
sr, err := wal.NewSegmentsReader(s.wal.Dir()) sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, sr.Close()) require.NoError(t, sr.Close())
@ -219,7 +219,7 @@ func TestRollback(t *testing.T) {
// Read records from WAL and check for expected count of series and samples. // Read records from WAL and check for expected count of series and samples.
var ( var (
r = wal.NewReader(sr) r = wlog.NewReader(sr)
dec record.Decoder dec record.Decoder
walSeriesCount, walSamplesCount, walExemplarsCount int walSeriesCount, walSamplesCount, walExemplarsCount int
@ -534,10 +534,10 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
// Read back what was written to the WAL. // Read back what was written to the WAL.
var walExemplarsCount int var walExemplarsCount int
sr, err := wal.NewSegmentsReader(s.wal.Dir()) sr, err := wlog.NewSegmentsReader(s.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
defer sr.Close() defer sr.Close()
r := wal.NewReader(sr) r := wlog.NewReader(sr)
var dec record.Decoder var dec record.Decoder
for r.Next() { for r.Next() {

View file

@ -35,7 +35,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
// In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped // In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped
@ -485,7 +485,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
return filepath.Join(dir, ulid.String()) return filepath.Join(dir, ulid.String())
} }
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head { func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head {
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = chunkDir opts.ChunkDirRoot = chunkDir
head, err := NewHead(nil, nil, w, nil, opts, nil) head, err := NewHead(nil, nil, w, nil, opts, nil)
@ -507,7 +507,7 @@ func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir str
return head return head
} }
func createHeadWithOOOSamples(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head { func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string, oooSampleFrequency int) *Head {
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = chunkDir opts.ChunkDirRoot = chunkDir
opts.OutOfOrderTimeWindow.Store(10000000000) opts.OutOfOrderTimeWindow.Store(10000000000)

View file

@ -45,7 +45,7 @@ import (
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met. _ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
const ( const (
@ -70,7 +70,7 @@ var ErrNotReady = errors.New("TSDB not ready")
// millisecond precision timestamps. // millisecond precision timestamps.
func DefaultOptions() *Options { func DefaultOptions() *Options {
return &Options{ return &Options{
WALSegmentSize: wal.DefaultSegmentSize, WALSegmentSize: wlog.DefaultSegmentSize,
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
MinBlockDuration: DefaultBlockDuration, MinBlockDuration: DefaultBlockDuration,
@ -389,14 +389,14 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
if len(blockReaders) > 0 { if len(blockReaders) > 0 {
maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime maxBlockTime = blockReaders[len(blockReaders)-1].Meta().MaxTime
} }
w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal"))
if err != nil { if err != nil {
return err return err
} }
var wbl *wal.WAL var wbl *wlog.WL
wblDir := filepath.Join(db.dir, wal.WblDirName) wblDir := filepath.Join(db.dir, wlog.WblDirName)
if _, err := os.Stat(wblDir); !os.IsNotExist(err) { if _, err := os.Stat(wblDir); !os.IsNotExist(err) {
wbl, err = wal.Open(db.logger, wblDir) wbl, err = wlog.Open(db.logger, wblDir)
if err != nil { if err != nil {
return err return err
} }
@ -473,14 +473,14 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
if err := head.Close(); err != nil { if err := head.Close(); err != nil {
return nil, err return nil, err
} }
w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) w, err := wlog.Open(db.logger, filepath.Join(db.dir, "wal"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
var wbl *wal.WAL var wbl *wlog.WL
wblDir := filepath.Join(db.dir, wal.WblDirName) wblDir := filepath.Join(db.dir, wlog.WblDirName)
if _, err := os.Stat(wblDir); !os.IsNotExist(err) { if _, err := os.Stat(wblDir); !os.IsNotExist(err) {
wbl, err = wal.Open(db.logger, wblDir) wbl, err = wlog.Open(db.logger, wblDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -677,7 +677,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
} }
walDir := filepath.Join(dir, "wal") walDir := filepath.Join(dir, "wal")
wblDir := filepath.Join(dir, wal.WblDirName) wblDir := filepath.Join(dir, wlog.WblDirName)
// Migrate old WAL if one exists. // Migrate old WAL if one exists.
if err := MigrateWAL(l, walDir); err != nil { if err := MigrateWAL(l, walDir); err != nil {
@ -739,15 +739,15 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
} }
db.compactCancel = cancel db.compactCancel = cancel
var wlog, wblog *wal.WAL var wal, wbl *wlog.WL
segmentSize := wal.DefaultSegmentSize segmentSize := wlog.DefaultSegmentSize
// Wal is enabled. // Wal is enabled.
if opts.WALSegmentSize >= 0 { if opts.WALSegmentSize >= 0 {
// Wal is set to a custom size. // Wal is set to a custom size.
if opts.WALSegmentSize > 0 { if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize segmentSize = opts.WALSegmentSize
} }
wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression) wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -757,7 +757,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
return nil, err return nil, err
} }
if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 { if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 {
wblog, err = wal.NewSize(l, r, wblDir, segmentSize, opts.WALCompression) wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -781,7 +781,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
// We only override this flag if isolation is disabled at DB level. We use the default otherwise. // We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled headOpts.IsolationDisabled = opts.IsolationDisabled
} }
db.head, err = NewHead(r, l, wlog, wblog, headOpts, stats.Head) db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -813,12 +813,12 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
isOOOErr := isErrLoadOOOWal(initErr) isOOOErr := isErrLoadOOOWal(initErr)
if isOOOErr { if isOOOErr {
level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr) level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr)
if err := wblog.Repair(initErr); err != nil { if err := wbl.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted OOO WAL") return nil, errors.Wrap(err, "repair corrupted OOO WAL")
} }
} else { } else {
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr) level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
if err := wlog.Repair(initErr); err != nil { if err := wal.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL") return nil, errors.Wrap(err, "repair corrupted WAL")
} }
} }
@ -947,19 +947,19 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
} }
// Create WBL if it was not present and if OOO is enabled with WAL enabled. // Create WBL if it was not present and if OOO is enabled with WAL enabled.
var wblog *wal.WAL var wblog *wlog.WL
var err error var err error
if db.head.wbl != nil { if db.head.wbl != nil {
// The existing WBL from the disk might have been replayed while OOO was disabled. // The existing WBL from the disk might have been replayed while OOO was disabled.
wblog = db.head.wbl wblog = db.head.wbl
} else if !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0 { } else if !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0 {
segmentSize := wal.DefaultSegmentSize segmentSize := wlog.DefaultSegmentSize
// Wal is set to a custom size. // Wal is set to a custom size.
if db.opts.WALSegmentSize > 0 { if db.opts.WALSegmentSize > 0 {
segmentSize = db.opts.WALSegmentSize segmentSize = db.opts.WALSegmentSize
} }
oooWalDir := filepath.Join(db.dir, wal.WblDirName) oooWalDir := filepath.Join(db.dir, wlog.WblDirName)
wblog, err = wal.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression) wblog, err = wlog.NewSize(db.logger, db.registerer, oooWalDir, segmentSize, db.opts.WALCompression)
if err != nil { if err != nil {
return err return err
} }

View file

@ -51,7 +51,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -230,7 +230,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
f, err := os.OpenFile(path.Join(db.Dir(), "wal", walFiles[0].Name()), os.O_RDWR, 0o666) f, err := os.OpenFile(path.Join(db.Dir(), "wal", walFiles[0].Name()), os.O_RDWR, 0o666)
require.NoError(t, err) require.NoError(t, err)
r := wal.NewReader(bufio.NewReader(f)) r := wlog.NewReader(bufio.NewReader(f))
require.True(t, r.Next(), "reading the series record") require.True(t, r.Next(), "reading the series record")
require.True(t, r.Next(), "reading the first sample record") require.True(t, r.Next(), "reading the first sample record")
// Write an invalid record header to corrupt everything after the first wal sample. // Write an invalid record header to corrupt everything after the first wal sample.
@ -1473,9 +1473,9 @@ func TestSizeRetention(t *testing.T) {
require.Equal(t, expSize, actSize, "registered size doesn't match actual disk size") require.Equal(t, expSize, actSize, "registered size doesn't match actual disk size")
// Create a WAL checkpoint, and compare sizes. // Create a WAL checkpoint, and compare sizes.
first, last, err := wal.Segments(db.Head().wal.Dir()) first, last, err := wlog.Segments(db.Head().wal.Dir())
require.NoError(t, err) require.NoError(t, err)
_, err = wal.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0) _, err = wlog.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x chunks.HeadSeriesRef) bool { return false }, 0)
require.NoError(t, err) require.NoError(t, err)
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
walSize, err = db.Head().wal.Size() walSize, err = db.Head().wal.Size()
@ -1881,7 +1881,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false)
require.NoError(t, err) require.NoError(t, err)
var enc record.Encoder var enc record.Encoder
@ -1923,7 +1923,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000)) createBlock(t, dir, genSeries(1, 1, 1000, 6000))
require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777)) require.NoError(t, os.MkdirAll(path.Join(dir, "wal"), 0o777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) w, err := wlog.New(nil, nil, path.Join(dir, "wal"), false)
require.NoError(t, err) require.NoError(t, err)
var enc record.Encoder var enc record.Encoder
@ -2323,7 +2323,7 @@ func TestDBReadOnly(t *testing.T) {
} }
// Add head to test DBReadOnly WAL reading capabilities. // Add head to test DBReadOnly WAL reading capabilities.
w, err := wal.New(logger, nil, filepath.Join(dbDir, "wal"), true) w, err := wlog.New(logger, nil, filepath.Join(dbDir, "wal"), true)
require.NoError(t, err) require.NoError(t, err)
h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir) h := createHead(t, w, genSeries(1, 1, 16, 18), dbDir)
require.NoError(t, h.Close()) require.NoError(t, h.Close())
@ -3089,7 +3089,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// Check the existing WAL files. // Check the existing WAL files.
first, last, err := wal.Segments(db.head.wal.Dir()) first, last, err := wlog.Segments(db.head.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, first) require.Equal(t, 0, first)
require.Equal(t, 60, last) require.Equal(t, 60, last)
@ -3104,14 +3104,14 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.Equal(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal)) require.Equal(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal))
// The compaction should have only truncated first 2/3 of WAL (while also rotating the files). // The compaction should have only truncated first 2/3 of WAL (while also rotating the files).
first, last, err = wal.Segments(db.head.wal.Dir()) first, last, err = wlog.Segments(db.head.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 40, first) require.Equal(t, 40, first)
require.Equal(t, 61, last) require.Equal(t, 61, last)
// The first checkpoint would be for first 2/3rd of WAL, hence till 39. // The first checkpoint would be for first 2/3rd of WAL, hence till 39.
// That should be the last checkpoint. // That should be the last checkpoint.
_, cno, err := wal.LastCheckpoint(db.head.wal.Dir()) _, cno, err := wlog.LastCheckpoint(db.head.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 39, cno) require.Equal(t, 39, cno)
@ -3147,7 +3147,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.Equal(t, newBlockMaxt, db.head.MinTime()) require.Equal(t, newBlockMaxt, db.head.MinTime())
// Another WAL file was rotated. // Another WAL file was rotated.
first, last, err = wal.Segments(db.head.wal.Dir()) first, last, err = wlog.Segments(db.head.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 40, first) require.Equal(t, 40, first)
require.Equal(t, 62, last) require.Equal(t, 62, last)
@ -3160,14 +3160,14 @@ func TestOneCheckpointPerCompactCall(t *testing.T) {
require.Equal(t, 59, len(db.Blocks())) require.Equal(t, 59, len(db.Blocks()))
// The compaction should have only truncated first 2/3 of WAL (while also rotating the files). // The compaction should have only truncated first 2/3 of WAL (while also rotating the files).
first, last, err = wal.Segments(db.head.wal.Dir()) first, last, err = wlog.Segments(db.head.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 55, first) require.Equal(t, 55, first)
require.Equal(t, 63, last) require.Equal(t, 63, last)
// The first checkpoint would be for first 2/3rd of WAL, hence till 54. // The first checkpoint would be for first 2/3rd of WAL, hence till 54.
// That should be the last checkpoint. // That should be the last checkpoint.
_, cno, err = wal.LastCheckpoint(db.head.wal.Dir()) _, cno, err = wlog.LastCheckpoint(db.head.wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 54, cno) require.Equal(t, 54, cno)
} }
@ -3615,9 +3615,9 @@ func TestOOOWALWrite(t *testing.T) {
} }
getRecords := func(walDir string) []interface{} { getRecords := func(walDir string) []interface{} {
sr, err := wal.NewSegmentsReader(walDir) sr, err := wlog.NewSegmentsReader(walDir)
require.NoError(t, err) require.NoError(t, err)
r := wal.NewReader(sr) r := wlog.NewReader(sr)
defer func() { defer func() {
require.NoError(t, sr.Close()) require.NoError(t, sr.Close())
}() }()
@ -3654,7 +3654,7 @@ func TestOOOWALWrite(t *testing.T) {
require.Equal(t, inOrderRecords, actRecs) require.Equal(t, inOrderRecords, actRecs)
// The OOO WAL. // The OOO WAL.
actRecs = getRecords(path.Join(dir, wal.WblDirName)) actRecs = getRecords(path.Join(dir, wlog.WblDirName))
require.Equal(t, oooRecords, actRecs) require.Equal(t, oooRecords, actRecs)
} }
@ -3848,16 +3848,16 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// Let's create a checkpoint. // Let's create a checkpoint.
first, last, err := wal.Segments(w.Dir()) first, last, err := wlog.Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
keep := func(id chunks.HeadSeriesRef) bool { keep := func(id chunks.HeadSeriesRef) bool {
return id != 3 return id != 3
} }
_, err = wal.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0) _, err = wlog.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0)
require.NoError(t, err) require.NoError(t, err)
// Confirm there's been a checkpoint. // Confirm there's been a checkpoint.
cdir, _, err := wal.LastCheckpoint(w.Dir()) cdir, _, err := wlog.LastCheckpoint(w.Dir())
require.NoError(t, err) require.NoError(t, err)
// Read in checkpoint and WAL. // Read in checkpoint and WAL.
@ -4605,7 +4605,7 @@ func TestOOODisabled(t *testing.T) {
"number of ooo/oob samples mismatch") "number of ooo/oob samples mismatch")
// Verifying that no OOO artifacts were generated. // Verifying that no OOO artifacts were generated.
_, err = os.ReadDir(path.Join(db.Dir(), wal.WblDirName)) _, err = os.ReadDir(path.Join(db.Dir(), wlog.WblDirName))
require.True(t, os.IsNotExist(err)) require.True(t, os.IsNotExist(err))
ms, created, err := db.head.getOrCreate(s1.Hash(), s1) ms, created, err := db.head.getOrCreate(s1.Hash(), s1)
@ -4770,12 +4770,12 @@ func TestWBLAndMmapReplay(t *testing.T) {
resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above. resetMmapToOriginal() // We neet to reset because new duplicate chunks can be written above.
// Removing m-map markers in WBL by rewriting it. // Removing m-map markers in WBL by rewriting it.
newWbl, err := wal.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false) newWbl, err := wlog.New(log.NewNopLogger(), nil, filepath.Join(t.TempDir(), "new_wbl"), false)
require.NoError(t, err) require.NoError(t, err)
sr, err := wal.NewSegmentsReader(originalWblDir) sr, err := wlog.NewSegmentsReader(originalWblDir)
require.NoError(t, err) require.NoError(t, err)
var dec record.Decoder var dec record.Decoder
r, markers, addedRecs := wal.NewReader(sr), 0, 0 r, markers, addedRecs := wlog.NewReader(sr), 0, 0
for r.Next() { for r.Next() {
rec := r.Record() rec := r.Record()
if dec.Type(rec) == record.MmapMarkers { if dec.Type(rec) == record.MmapMarkers {

View file

@ -41,7 +41,7 @@ import (
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
var ( var (
@ -75,7 +75,7 @@ type Head struct {
metrics *headMetrics metrics *headMetrics
opts *HeadOptions opts *HeadOptions
wal, wbl *wal.WAL wal, wbl *wlog.WL
exemplarMetrics *ExemplarMetrics exemplarMetrics *ExemplarMetrics
exemplars ExemplarStorage exemplars ExemplarStorage
logger log.Logger logger log.Logger
@ -186,7 +186,7 @@ type SeriesLifecycleCallback interface {
} }
// NewHead opens the head block in dir. // NewHead opens the head block in dir.
func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
var err error var err error
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
@ -602,13 +602,13 @@ func (h *Head) Init(minValidTime int64) error {
checkpointReplayStart := time.Now() checkpointReplayStart := time.Now()
// Backfill the checkpoint first if it exists. // Backfill the checkpoint first if it exists.
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) dir, startFrom, err := wlog.LastCheckpoint(h.wal.Dir())
if err != nil && err != record.ErrNotFound { if err != nil && err != record.ErrNotFound {
return errors.Wrap(err, "find last checkpoint") return errors.Wrap(err, "find last checkpoint")
} }
// Find the last segment. // Find the last segment.
_, endAt, e := wal.Segments(h.wal.Dir()) _, endAt, e := wlog.Segments(h.wal.Dir())
if e != nil { if e != nil {
return errors.Wrap(e, "finding WAL segments") return errors.Wrap(e, "finding WAL segments")
} }
@ -617,7 +617,7 @@ func (h *Head) Init(minValidTime int64) error {
multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{} multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
if err == nil && startFrom >= snapIdx { if err == nil && startFrom >= snapIdx {
sr, err := wal.NewSegmentsReader(dir) sr, err := wlog.NewSegmentsReader(dir)
if err != nil { if err != nil {
return errors.Wrap(err, "open checkpoint") return errors.Wrap(err, "open checkpoint")
} }
@ -629,7 +629,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user // A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway. // intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil { if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
return errors.Wrap(err, "backfill checkpoint") return errors.Wrap(err, "backfill checkpoint")
} }
h.updateWALReplayStatusRead(startFrom) h.updateWALReplayStatusRead(startFrom)
@ -645,7 +645,7 @@ func (h *Head) Init(minValidTime int64) error {
} }
// Backfill segments from the most recent checkpoint onwards. // Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= endAt; i++ { for i := startFrom; i <= endAt; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i))
if err != nil { if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
} }
@ -654,7 +654,7 @@ func (h *Head) Init(minValidTime int64) error {
if i == snapIdx { if i == snapIdx {
offset = snapOffset offset = snapOffset
} }
sr, err := wal.NewSegmentBufReaderWithOffset(offset, s) sr, err := wlog.NewSegmentBufReaderWithOffset(offset, s)
if errors.Cause(err) == io.EOF { if errors.Cause(err) == io.EOF {
// File does not exist. // File does not exist.
continue continue
@ -662,7 +662,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil { if err != nil {
return errors.Wrapf(err, "segment reader (offset=%d)", offset) return errors.Wrapf(err, "segment reader (offset=%d)", offset)
} }
err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks) err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
if err := sr.Close(); err != nil { if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
} }
@ -677,20 +677,20 @@ func (h *Head) Init(minValidTime int64) error {
wblReplayStart := time.Now() wblReplayStart := time.Now()
if h.wbl != nil { if h.wbl != nil {
// Replay OOO WAL. // Replay OOO WAL.
startFrom, endAt, e = wal.Segments(h.wbl.Dir()) startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
if e != nil { if e != nil {
return errors.Wrap(e, "finding OOO WAL segments") return errors.Wrap(e, "finding OOO WAL segments")
} }
h.startWALReplayStatus(startFrom, endAt) h.startWALReplayStatus(startFrom, endAt)
for i := startFrom; i <= endAt; i++ { for i := startFrom; i <= endAt; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wbl.Dir(), i)) s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
if err != nil { if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i)) return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))
} }
sr := wal.NewSegmentBufReader(s) sr := wlog.NewSegmentBufReader(s)
err = h.loadWBL(wal.NewReader(sr), multiRef, lastMmapRef) err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef)
if err := sr.Close(); err != nil { if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err) level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
} }
@ -840,7 +840,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef
return mmappedChunks, oooMmappedChunks, lastRef, nil return mmappedChunks, oooMmappedChunks, lastRef, nil
} }
func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) { func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL) {
oooTimeWindow := int64(0) oooTimeWindow := int64(0)
if cfg.StorageConfig.TSDBConfig != nil { if cfg.StorageConfig.TSDBConfig != nil {
oooTimeWindow = cfg.StorageConfig.TSDBConfig.OutOfOrderTimeWindow oooTimeWindow = cfg.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
@ -872,7 +872,7 @@ func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) {
// SetOutOfOrderTimeWindow updates the out of order related parameters. // SetOutOfOrderTimeWindow updates the out of order related parameters.
// If the Head already has a WBL set, then the wbl will be ignored. // If the Head already has a WBL set, then the wbl will be ignored.
func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wal.WAL) { func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL) {
if oooTimeWindow > 0 && h.wbl == nil { if oooTimeWindow > 0 && h.wbl == nil {
h.wbl = wbl h.wbl = wbl
} }
@ -1095,7 +1095,7 @@ func (h *Head) truncateWAL(mint int64) error {
start := time.Now() start := time.Now()
h.lastWALTruncationTime.Store(mint) h.lastWALTruncationTime.Store(mint)
first, last, err := wal.Segments(h.wal.Dir()) first, last, err := wlog.Segments(h.wal.Dir())
if err != nil { if err != nil {
return errors.Wrap(err, "get segment range") return errors.Wrap(err, "get segment range")
} }
@ -1127,9 +1127,9 @@ func (h *Head) truncateWAL(mint int64) error {
return ok return ok
} }
h.metrics.checkpointCreationTotal.Inc() h.metrics.checkpointCreationTotal.Inc()
if _, err = wal.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc() h.metrics.checkpointCreationFail.Inc()
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { if _, ok := errors.Cause(err).(*wlog.CorruptionErr); ok {
h.metrics.walCorruptionsTotal.Inc() h.metrics.walCorruptionsTotal.Inc()
} }
return errors.Wrap(err, "create checkpoint") return errors.Wrap(err, "create checkpoint")
@ -1152,7 +1152,7 @@ func (h *Head) truncateWAL(mint int64) error {
h.deletedMtx.Unlock() h.deletedMtx.Unlock()
h.metrics.checkpointDeleteTotal.Inc() h.metrics.checkpointDeleteTotal.Inc()
if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil { if err := wlog.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond // Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space. // occupying disk space.
// They will just be ignored since a higher checkpoint exists. // They will just be ignored since a higher checkpoint exists.
@ -1395,7 +1395,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
h.tombstones.TruncateBefore(mint) h.tombstones.TruncateBefore(mint)
if h.wal != nil { if h.wal != nil {
_, last, _ := wal.Segments(h.wal.Dir()) _, last, _ := wlog.Segments(h.wal.Dir())
h.deletedMtx.Lock() h.deletedMtx.Lock()
// Keep series records until we're past segment 'last' // Keep series records until we're past segment 'last'
// because the WAL will still have samples records with // because the WAL will still have samples records with

View file

@ -46,12 +46,12 @@ import (
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wal.WAL) { func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (*Head, *wlog.WL) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -63,14 +63,14 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
} }
h, err := NewHead(nil, nil, wlog, nil, opts, nil) h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error { require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_ chunks.HeadSeriesRef, _ chunks.ChunkDiskMapperRef, _, _ int64, _ uint16, _ chunkenc.Encoding) error {
return nil return nil
})) }))
return h, wlog return h, wal
} }
func BenchmarkCreateSeries(b *testing.B) { func BenchmarkCreateSeries(b *testing.B) {
@ -88,7 +88,7 @@ func BenchmarkCreateSeries(b *testing.B) {
} }
} }
func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { func populateTestWAL(t testing.TB, w *wlog.WL, recs []interface{}) {
var enc record.Encoder var enc record.Encoder
for _, r := range recs { for _, r := range recs {
switch v := r.(type) { switch v := r.(type) {
@ -105,12 +105,12 @@ func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
} }
func readTestWAL(t testing.TB, dir string) (recs []interface{}) { func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
sr, err := wal.NewSegmentsReader(dir) sr, err := wlog.NewSegmentsReader(dir)
require.NoError(t, err) require.NoError(t, err)
defer sr.Close() defer sr.Close()
var dec record.Decoder var dec record.Decoder
r := wal.NewReader(sr) r := wlog.NewReader(sr)
for r.Next() { for r.Next() {
rec := r.Record() rec := r.Record()
@ -189,7 +189,7 @@ func BenchmarkLoadWAL(b *testing.B) {
func(b *testing.B) { func(b *testing.B) {
dir := b.TempDir() dir := b.TempDir()
w, err := wal.New(nil, nil, dir, false) w, err := wlog.New(nil, nil, dir, false)
require.NoError(b, err) require.NoError(b, err)
// Write series. // Write series.
@ -571,7 +571,7 @@ func TestHead_WALMultiRef(t *testing.T) {
require.NotEqual(t, ref1, ref2, "Refs are the same") require.NotEqual(t, ref1, ref2, "Refs are the same")
require.NoError(t, head.Close()) require.NoError(t, head.Close())
w, err = wal.New(nil, nil, w.Dir(), false) w, err = wlog.New(nil, nil, w.Dir(), false)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -879,7 +879,7 @@ func TestHeadDeleteSimple(t *testing.T) {
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
// Compare the samples for both heads - before and after the reloadBlocks. // Compare the samples for both heads - before and after the reloadBlocks.
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. reloadedW, err := wlog.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkRange = 1000 opts.ChunkRange = 1000
@ -1000,7 +1000,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
// Confirm there's been a checkpoint. // Confirm there's been a checkpoint.
cdir, _, err := wal.LastCheckpoint(w.Dir()) cdir, _, err := wlog.LastCheckpoint(w.Dir())
require.NoError(t, err) require.NoError(t, err)
// Read in checkpoint and WAL. // Read in checkpoint and WAL.
recs := readTestWAL(t, cdir) recs := readTestWAL(t, cdir)
@ -1592,7 +1592,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
// Fill the wal and corrupt it. // Fill the wal and corrupt it.
{ {
w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress) w, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), compress)
require.NoError(t, err) require.NoError(t, err)
for i := 1; i <= test.totalRecs; i++ { for i := 1; i <= test.totalRecs; i++ {
@ -1613,7 +1613,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
initErr := h.Init(math.MinInt64) initErr := h.Init(math.MinInt64)
err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
_, corrErr := err.(*wal.CorruptionErr) _, corrErr := err.(*wlog.CorruptionErr)
require.True(t, corrErr, "reading the wal didn't return corruption error") require.True(t, corrErr, "reading the wal didn't return corruption error")
require.NoError(t, h.Close()) // Head will close the wal as well. require.NoError(t, h.Close()) // Head will close the wal as well.
} }
@ -1630,10 +1630,10 @@ func TestWalRepair_DecodingError(t *testing.T) {
// Read the wal content after the repair. // Read the wal content after the repair.
{ {
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) sr, err := wlog.NewSegmentsReader(filepath.Join(dir, "wal"))
require.NoError(t, err) require.NoError(t, err)
defer sr.Close() defer sr.Close()
r := wal.NewReader(sr) r := wlog.NewReader(sr)
var actRec int var actRec int
for r.Next() { for r.Next() {
@ -1655,7 +1655,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
walDir := filepath.Join(dir, "wal") walDir := filepath.Join(dir, "wal")
// Fill the chunk segments and corrupt it. // Fill the chunk segments and corrupt it.
{ {
w, err := wal.New(nil, nil, walDir, false) w, err := wlog.New(nil, nil, walDir, false)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -1717,7 +1717,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
} }
func TestNewWalSegmentOnTruncate(t *testing.T) { func TestNewWalSegmentOnTruncate(t *testing.T) {
h, wlog := newTestHead(t, 1000, false, false) h, wal := newTestHead(t, 1000, false, false)
defer func() { defer func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
}() }()
@ -1729,19 +1729,19 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
} }
add(0) add(0)
_, last, err := wal.Segments(wlog.Dir()) _, last, err := wlog.Segments(wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, last) require.Equal(t, 0, last)
add(1) add(1)
require.NoError(t, h.Truncate(1)) require.NoError(t, h.Truncate(1))
_, last, err = wal.Segments(wlog.Dir()) _, last, err = wlog.Segments(wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, last) require.Equal(t, 1, last)
add(2) add(2)
require.NoError(t, h.Truncate(2)) require.NoError(t, h.Truncate(2))
_, last, err = wal.Segments(wlog.Dir()) _, last, err = wlog.Segments(wal.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, last) require.Equal(t, 2, last)
} }
@ -1896,12 +1896,12 @@ func TestMemSeriesIsolation(t *testing.T) {
i = addSamples(hb) i = addSamples(hb)
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) wal, err := wlog.NewSize(nil, nil, w.Dir(), 32768, false)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkRange = 1000 opts.ChunkRange = 1000
opts.ChunkDirRoot = wlog.Dir() opts.ChunkDirRoot = wal.Dir()
hb, err = NewHead(nil, nil, wlog, nil, opts, nil) hb, err = NewHead(nil, nil, wal, nil, opts, nil)
defer func() { require.NoError(t, hb.Close()) }() defer func() { require.NoError(t, hb.Close()) }()
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, hb.Init(0)) require.NoError(t, hb.Init(0))
@ -2832,7 +2832,7 @@ func TestChunkSnapshot(t *testing.T) {
} }
openHeadAndCheckReplay := func() { openHeadAndCheckReplay := func() {
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err) require.NoError(t, err)
head, err = NewHead(nil, nil, w, nil, head.opts, nil) head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err) require.NoError(t, err)
@ -3041,7 +3041,7 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, f.Close()) require.NoError(t, f.Close())
// Create new Head which should replay this snapshot. // Create new Head which should replay this snapshot.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) w, err := wlog.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err) require.NoError(t, err)
// Testing https://github.com/prometheus/prometheus/issues/9437 with the registry. // Testing https://github.com/prometheus/prometheus/issues/9437 with the registry.
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil) head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
@ -3059,7 +3059,7 @@ func TestSnapshotError(t *testing.T) {
// Tests https://github.com/prometheus/prometheus/issues/9725. // Tests https://github.com/prometheus/prometheus/issues/9725.
func TestChunkSnapshotReplayBug(t *testing.T) { func TestChunkSnapshotReplayBug(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
// Write few series records and samples such that the series references are not in order in the WAL // Write few series records and samples such that the series references are not in order in the WAL
@ -3086,10 +3086,10 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
rec := enc.Series([]record.RefSeries{seriesRec}, buf) rec := enc.Series([]record.RefSeries{seriesRec}, buf)
buf = rec[:0] buf = rec[:0]
require.NoError(t, wlog.Log(rec)) require.NoError(t, wal.Log(rec))
rec = enc.Samples([]record.RefSample{samplesRec}, buf) rec = enc.Samples([]record.RefSample{samplesRec}, buf)
buf = rec[:0] buf = rec[:0]
require.NoError(t, wlog.Log(rec)) require.NoError(t, wal.Log(rec))
} }
// Write a corrupt snapshot to fail the replay on startup. // Write a corrupt snapshot to fail the replay on startup.
@ -3103,7 +3103,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.EnableMemorySnapshotOnShutdown = true opts.EnableMemorySnapshotOnShutdown = true
head, err := NewHead(nil, nil, wlog, nil, opts, nil) head, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64)) require.NoError(t, head.Init(math.MinInt64))
defer func() { defer func() {
@ -3126,7 +3126,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) {
func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wlTemp, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots. // Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
@ -3137,7 +3137,7 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.EnableMemorySnapshotOnShutdown = true opts.EnableMemorySnapshotOnShutdown = true
head, err := NewHead(nil, nil, wlog, nil, opts, nil) head, err := NewHead(nil, nil, wlTemp, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64)) require.NoError(t, head.Init(math.MinInt64))
@ -3164,9 +3164,9 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
// TODO(codesome): Needs test for ooo WAL repair. // TODO(codesome): Needs test for ooo WAL repair.
func TestOOOWalReplay(t *testing.T) { func TestOOOWalReplay(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -3174,7 +3174,7 @@ func TestOOOWalReplay(t *testing.T) {
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
@ -3211,11 +3211,11 @@ func TestOOOWalReplay(t *testing.T) {
// Restart head. // Restart head.
require.NoError(t, h.Close()) require.NoError(t, h.Close())
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
oooWlog, err = wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
require.NoError(t, err) require.NoError(t, err)
h, err = NewHead(nil, nil, wlog, oooWlog, opts, nil) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) // Replay happens here. require.NoError(t, h.Init(0)) // Replay happens here.
@ -3248,9 +3248,9 @@ func TestOOOWalReplay(t *testing.T) {
// TestOOOMmapReplay checks the replay at a low level. // TestOOOMmapReplay checks the replay at a low level.
func TestOOOMmapReplay(t *testing.T) { func TestOOOMmapReplay(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -3259,7 +3259,7 @@ func TestOOOMmapReplay(t *testing.T) {
opts.OutOfOrderCapMax.Store(30) opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds()) opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
@ -3299,11 +3299,11 @@ func TestOOOMmapReplay(t *testing.T) {
// Restart head. // Restart head.
require.NoError(t, h.Close()) require.NoError(t, h.Close())
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
oooWlog, err = wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) oooWlog, err = wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
require.NoError(t, err) require.NoError(t, err)
h, err = NewHead(nil, nil, wlog, oooWlog, opts, nil) h, err = NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) // Replay happens here. require.NoError(t, h.Init(0)) // Replay happens here.
@ -3373,9 +3373,9 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
wlog, err := wal.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false) wal, err := wlog.NewSize(nil, nil, filepath.Join(h.opts.ChunkDirRoot, "wal"), 32768, false)
require.NoError(t, err) require.NoError(t, err)
h, err = NewHead(nil, nil, wlog, nil, h.opts, nil) h, err = NewHead(nil, nil, wal, nil, h.opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
@ -3408,7 +3408,7 @@ func (c *unsupportedChunk) Encoding() chunkenc.Encoding {
// Tests https://github.com/prometheus/prometheus/issues/10277. // Tests https://github.com/prometheus/prometheus/issues/10277.
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -3417,7 +3417,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
opts.EnableExemplarStorage = true opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
h, err := NewHead(nil, nil, wlog, nil, opts, nil) h, err := NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
@ -3441,7 +3441,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
addChunks() addChunks()
require.NoError(t, h.Close()) require.NoError(t, h.Close())
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) wal, err = wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
require.NoError(t, err) require.NoError(t, err)
mmapFilePath := filepath.Join(dir, "chunks_head", "000001") mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
@ -3451,7 +3451,7 @@ func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, f.Close()) require.NoError(t, f.Close())
h, err = NewHead(nil, nil, wlog, nil, opts, nil) h, err = NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
@ -3467,7 +3467,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
var err error var err error
openHead := func() { openHead := func() {
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -3476,7 +3476,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
opts.EnableMemorySnapshotOnShutdown = true opts.EnableMemorySnapshotOnShutdown = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
h, err = NewHead(nil, nil, wlog, nil, opts, nil) h, err = NewHead(nil, nil, wal, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, h.Init(0)) require.NoError(t, h.Init(0))
} }
@ -3541,9 +3541,9 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
func TestOOOAppendWithNoSeries(t *testing.T) { func TestOOOAppendWithNoSeries(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
@ -3551,7 +3551,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
opts.OutOfOrderCapMax.Store(30) opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds()) opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())
@ -3622,16 +3622,16 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
func TestHeadMinOOOTimeUpdate(t *testing.T) { func TestHeadMinOOOTimeUpdate(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true) wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err) require.NoError(t, err)
oooWlog, err := wal.NewSize(nil, nil, filepath.Join(dir, wal.WblDirName), 32768, true) oooWlog, err := wlog.NewSize(nil, nil, filepath.Join(dir, wlog.WblDirName), 32768, true)
require.NoError(t, err) require.NoError(t, err)
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, h.Close()) require.NoError(t, h.Close())

View file

@ -39,10 +39,10 @@ import (
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) { func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
// Track number of samples that referenced a series we don't know about // Track number of samples that referenced a series we don't know about
// for error reporting. // for error reporting.
var unknownRefs atomic.Uint64 var unknownRefs atomic.Uint64
@ -92,7 +92,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
defer func() { defer func() {
// For CorruptionErr ensure to terminate all workers before exiting. // For CorruptionErr ensure to terminate all workers before exiting.
_, ok := err.(*wal.CorruptionErr) _, ok := err.(*wlog.CorruptionErr)
if ok || seriesCreationErr != nil { if ok || seriesCreationErr != nil {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
processors[i].closeAndDrain() processors[i].closeAndDrain()
@ -148,7 +148,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
series := seriesPool.Get().([]record.RefSeries)[:0] series := seriesPool.Get().([]record.RefSeries)[:0]
series, err = dec.Series(rec, series) series, err = dec.Series(rec, series)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode series"), Err: errors.Wrap(err, "decode series"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -160,7 +160,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
samples := samplesPool.Get().([]record.RefSample)[:0] samples := samplesPool.Get().([]record.RefSample)[:0]
samples, err = dec.Samples(rec, samples) samples, err = dec.Samples(rec, samples)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode samples"), Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -172,7 +172,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
tstones := tstonesPool.Get().([]tombstones.Stone)[:0] tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
tstones, err = dec.Tombstones(rec, tstones) tstones, err = dec.Tombstones(rec, tstones)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode tombstones"), Err: errors.Wrap(err, "decode tombstones"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -184,7 +184,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0]
exemplars, err = dec.Exemplars(rec, exemplars) exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode exemplars"), Err: errors.Wrap(err, "decode exemplars"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -196,7 +196,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
meta := metadataPool.Get().([]record.RefMetadata)[:0] meta := metadataPool.Get().([]record.RefMetadata)[:0]
meta, err := dec.Metadata(rec, meta) meta, err := dec.Metadata(rec, meta)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode metadata"), Err: errors.Wrap(err, "decode metadata"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -481,7 +481,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
return unknownRefs, mmapOverlappingChunks return unknownRefs, mmapOverlappingChunks
} }
func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) { func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, m-map markers, that referenced a series we don't know about // Track number of samples, m-map markers, that referenced a series we don't know about
// for error reporting. // for error reporting.
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64 var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
@ -513,7 +513,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
defer func() { defer func() {
// For CorruptionErr ensure to terminate all workers before exiting. // For CorruptionErr ensure to terminate all workers before exiting.
// We also wrap it to identify OOO WBL corruption. // We also wrap it to identify OOO WBL corruption.
_, ok := err.(*wal.CorruptionErr) _, ok := err.(*wlog.CorruptionErr)
if ok { if ok {
err = &errLoadWbl{err: err} err = &errLoadWbl{err: err}
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -543,7 +543,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
samples := samplesPool.Get().([]record.RefSample)[:0] samples := samplesPool.Get().([]record.RefSample)[:0]
samples, err = dec.Samples(rec, samples) samples, err = dec.Samples(rec, samples)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode samples"), Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -555,7 +555,7 @@ func (h *Head) loadWBL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
markers := markersPool.Get().([]record.RefMmapMarker)[:0] markers := markersPool.Get().([]record.RefMmapMarker)[:0]
markers, err = dec.MmapMarkers(rec, markers) markers, err = dec.MmapMarkers(rec, markers)
if err != nil { if err != nil {
decodeErr = &wal.CorruptionErr{ decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode mmap markers"), Err: errors.Wrap(err, "decode mmap markers"),
Segment: r.Segment(), Segment: r.Segment(),
Offset: r.Offset(), Offset: r.Offset(),
@ -931,7 +931,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return stats, errors.Wrap(err, "create chunk snapshot dir") return stats, errors.Wrap(err, "create chunk snapshot dir")
} }
cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
if err != nil { if err != nil {
return stats, errors.Wrap(err, "open chunk snapshot") return stats, errors.Wrap(err, "open chunk snapshot")
} }
@ -1170,7 +1170,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
} }
start := time.Now() start := time.Now()
sr, err := wal.NewSegmentsReader(dir) sr, err := wlog.NewSegmentsReader(dir)
if err != nil { if err != nil {
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot") return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
} }
@ -1241,7 +1241,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
}(i, recordChan) }(i, recordChan)
} }
r := wal.NewReader(sr) r := wlog.NewReader(sr)
var loopErr error var loopErr error
Outer: Outer:
for r.Next() { for r.Next() {

View file

@ -37,7 +37,7 @@ import (
"github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
// WALEntryType indicates what data a WAL entry contains. // WALEntryType indicates what data a WAL entry contains.
@ -89,7 +89,7 @@ func newWalMetrics(r prometheus.Registerer) *walMetrics {
// WAL is a write ahead log that can log new series labels and samples. // WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged. // It must be completely read before new entries are logged.
// //
// DEPRECATED: use wal pkg combined with the record codex instead. // DEPRECATED: use wlog pkg combined with the record codex instead.
type WAL interface { type WAL interface {
Reader() WALReader Reader() WALReader
LogSeries([]record.RefSeries) error LogSeries([]record.RefSeries) error
@ -146,7 +146,7 @@ func newCRC32() hash.Hash32 {
// SegmentWAL is a write ahead log for series data. // SegmentWAL is a write ahead log for series data.
// //
// DEPRECATED: use wal pkg combined with the record coders instead. // DEPRECATED: use wlog pkg combined with the record coders instead.
type SegmentWAL struct { type SegmentWAL struct {
mtx sync.Mutex mtx sync.Mutex
metrics *walMetrics metrics *walMetrics
@ -1229,7 +1229,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err := os.RemoveAll(tmpdir); err != nil { if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir") return errors.Wrap(err, "cleanup replacement dir")
} }
repl, err := wal.New(logger, nil, tmpdir, false) repl, err := wlog.New(logger, nil, tmpdir, false)
if err != nil { if err != nil {
return errors.Wrap(err, "open new WAL") return errors.Wrap(err, "open new WAL")
} }

View file

@ -34,7 +34,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wal" "github.com/prometheus/prometheus/tsdb/wlog"
) )
func TestSegmentWAL_cut(t *testing.T) { func TestSegmentWAL_cut(t *testing.T) {
@ -450,7 +450,7 @@ func TestMigrateWAL_Empty(t *testing.T) {
wdir := path.Join(dir, "wal") wdir := path.Join(dir, "wal")
// Initialize empty WAL. // Initialize empty WAL.
w, err := wal.New(nil, nil, wdir, false) w, err := wlog.New(nil, nil, wdir, false)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Close()) require.NoError(t, w.Close())
@ -493,7 +493,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
// Perform migration. // Perform migration.
require.NoError(t, MigrateWAL(nil, wdir)) require.NoError(t, MigrateWAL(nil, wdir))
w, err := wal.New(nil, nil, wdir, false) w, err := wlog.New(nil, nil, wdir, false)
require.NoError(t, err) require.NoError(t, err)
// We can properly write some new data after migration. // We can properly write some new data after migration.
@ -505,10 +505,10 @@ func TestMigrateWAL_Fuzz(t *testing.T) {
require.NoError(t, w.Close()) require.NoError(t, w.Close())
// Read back all data. // Read back all data.
sr, err := wal.NewSegmentsReader(wdir) sr, err := wlog.NewSegmentsReader(wdir)
require.NoError(t, err) require.NoError(t, err)
r := wal.NewReader(sr) r := wlog.NewReader(sr)
var res []interface{} var res []interface{}
var dec record.Decoder var dec record.Decoder

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"fmt" "fmt"
@ -93,7 +93,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself. // segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate // This makes it easy to read it through the WAL package and concatenate
// it with the original WAL. // it with the original WAL.
func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sgmReader io.ReadCloser var sgmReader io.ReadCloser

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"fmt" "fmt"
@ -260,7 +260,7 @@ func TestCheckpoint(t *testing.T) {
} }
func TestCheckpointNoTmpFolderAfterError(t *testing.T) { func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wal with invalid data. // Create a new wlog with invalid data.
dir := t.TempDir() dir := t.TempDir()
w, err := NewSize(nil, nil, dir, 64*1024, false) w, err := NewSize(nil, nil, dir, 64*1024, false)
require.NoError(t, err) require.NoError(t, err)
@ -277,17 +277,17 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, f.Close()) require.NoError(t, f.Close())
// Run the checkpoint and since the wal contains corrupt data this should return an error. // Run the checkpoint and since the wlog contains corrupt data this should return an error.
_, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0) _, err = Checkpoint(log.NewNopLogger(), w, 0, 1, nil, 0)
require.Error(t, err) require.Error(t, err)
// Walk the wal dir to make sure there are no tmp folder left behind after the error. // Walk the wlog dir to make sure there are no tmp folder left behind after the error.
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
if err != nil { if err != nil {
return errors.Wrapf(err, "access err %q: %v", path, err) return errors.Wrapf(err, "access err %q: %v", path, err)
} }
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) return fmt.Errorf("wlog dir contains temporary folder:%s", info.Name())
} }
return nil return nil
}) })

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"encoding/binary" "encoding/binary"

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"encoding/binary" "encoding/binary"

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"bytes" "bytes"
@ -240,7 +240,7 @@ func TestReader_Live(t *testing.T) {
const fuzzLen = 500 const fuzzLen = 500
func generateRandomEntries(w *WAL, records chan []byte) error { func generateRandomEntries(w *WL, records chan []byte) error {
var recs [][]byte var recs [][]byte
for i := 0; i < fuzzLen; i++ { for i := 0; i < fuzzLen; i++ {
var sz int64 var sz int64

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"fmt" "fmt"
@ -301,7 +301,7 @@ func (w *Watcher) firstAndLast() (int, int, error) {
return refs[0], refs[len(refs)-1], nil return refs[0], refs[len(refs)-1], nil
} }
// Copied from tsdb/wal/wal.go so we do not have to open a WAL. // Copied from tsdb/wlog/wlog.go so we do not have to open a WAL.
// Plan is to move WAL watcher to TSDB and dedupe these implementations. // Plan is to move WAL watcher to TSDB and dedupe these implementations.
func (w *Watcher) segments(dir string) ([]int, error) { func (w *Watcher) segments(dir string) ([]int, error) {
files, err := os.ReadDir(dir) files, err := os.ReadDir(dir)

View file

@ -10,7 +10,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"fmt" "fmt"

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"bufio" "bufio"
@ -133,7 +133,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
// If it was torn mid-record, a full read (which the caller should do anyway // If it was torn mid-record, a full read (which the caller should do anyway
// to ensure integrity) will detect it as a corruption by the end. // to ensure integrity) will detect it as a corruption by the end.
if d := stat.Size() % pageSize; d != 0 { if d := stat.Size() % pageSize; d != 0 {
level.Warn(logger).Log("msg", "Last page of the wal is torn, filling it with zeros", "segment", segName) level.Warn(logger).Log("msg", "Last page of the wlog is torn, filling it with zeros", "segment", segName)
if _, err := f.Write(make([]byte, pageSize-d)); err != nil { if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
f.Close() f.Close()
return nil, errors.Wrap(err, "zero-pad torn page") return nil, errors.Wrap(err, "zero-pad torn page")
@ -164,7 +164,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil return &Segment{SegmentFile: f, i: k, dir: filepath.Dir(fn)}, nil
} }
// WAL is a write ahead log that stores records in segment files. // WL is a write log that stores records in segment files.
// It must be read from start to end once before logging new data. // It must be read from start to end once before logging new data.
// If an error occurs during read, the repair procedure must be called // If an error occurs during read, the repair procedure must be called
// before it's safe to do further writes. // before it's safe to do further writes.
@ -174,7 +174,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
// Records are never split across segments to allow full segments to be // Records are never split across segments to allow full segments to be
// safely truncated. It also ensures that torn writes never corrupt records // safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment. // beyond the most recent segment.
type WAL struct { type WL struct {
dir string dir string
logger log.Logger logger log.Logger
segmentSize int segmentSize int
@ -188,10 +188,10 @@ type WAL struct {
compress bool compress bool
snappyBuf []byte snappyBuf []byte
metrics *walMetrics metrics *wlMetrics
} }
type walMetrics struct { type wlMetrics struct {
fsyncDuration prometheus.Summary fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter pageFlushes prometheus.Counter
pageCompletions prometheus.Counter pageCompletions prometheus.Counter
@ -201,12 +201,12 @@ type walMetrics struct {
writesFailed prometheus.Counter writesFailed prometheus.Counter
} }
func newWALMetrics(r prometheus.Registerer) *walMetrics { func newWLMetrics(r prometheus.Registerer) *wlMetrics {
m := &walMetrics{} m := &wlMetrics{}
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "fsync_duration_seconds", Name: "fsync_duration_seconds",
Help: "Duration of WAL fsync.", Help: "Duration of write log fsync.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}) })
m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ m.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{
@ -219,19 +219,19 @@ func newWALMetrics(r prometheus.Registerer) *walMetrics {
}) })
m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{ m.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "truncations_failed_total", Name: "truncations_failed_total",
Help: "Total number of WAL truncations that failed.", Help: "Total number of write log truncations that failed.",
}) })
m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ m.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "truncations_total", Name: "truncations_total",
Help: "Total number of WAL truncations attempted.", Help: "Total number of write log truncations attempted.",
}) })
m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{ m.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "segment_current", Name: "segment_current",
Help: "WAL segment index that TSDB is currently writing to.", Help: "Write log segment index that TSDB is currently writing to.",
}) })
m.writesFailed = prometheus.NewCounter(prometheus.CounterOpts{ m.writesFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "writes_failed_total", Name: "writes_failed_total",
Help: "Total number of WAL writes that failed.", Help: "Total number of write log writes that failed.",
}) })
if r != nil { if r != nil {
@ -250,13 +250,13 @@ func newWALMetrics(r prometheus.Registerer) *walMetrics {
} }
// New returns a new WAL over the given directory. // New returns a new WAL over the given directory.
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) { func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WL, error) {
return NewSize(logger, reg, dir, DefaultSegmentSize, compress) return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
} }
// NewSize returns a new WAL over the given directory. // NewSize returns a new write log over the given directory.
// New segments are created with the specified size. // New segments are created with the specified size.
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) { func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WL, error) {
if segmentSize%pageSize != 0 { if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size") return nil, errors.New("invalid segment size")
} }
@ -266,7 +266,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
w := &WAL{ w := &WL{
dir: dir, dir: dir,
logger: logger, logger: logger,
segmentSize: segmentSize, segmentSize: segmentSize,
@ -277,9 +277,9 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
} }
prefix := "prometheus_tsdb_wal_" prefix := "prometheus_tsdb_wal_"
if filepath.Base(dir) == WblDirName { if filepath.Base(dir) == WblDirName {
prefix = "prometheus_tsdb_out_of_order_wal_" prefix = "prometheus_tsdb_out_of_order_wbl_"
} }
w.metrics = newWALMetrics(prometheus.WrapRegistererWithPrefix(prefix, reg)) w.metrics = newWLMetrics(prometheus.WrapRegistererWithPrefix(prefix, reg))
_, last, err := Segments(w.Dir()) _, last, err := Segments(w.Dir())
if err != nil { if err != nil {
@ -308,11 +308,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
} }
// Open an existing WAL. // Open an existing WAL.
func Open(logger log.Logger, dir string) (*WAL, error) { func Open(logger log.Logger, dir string) (*WL, error) {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
w := &WAL{ w := &WL{
dir: dir, dir: dir,
logger: logger, logger: logger,
} }
@ -321,16 +321,16 @@ func Open(logger log.Logger, dir string) (*WAL, error) {
} }
// CompressionEnabled returns if compression is enabled on this WAL. // CompressionEnabled returns if compression is enabled on this WAL.
func (w *WAL) CompressionEnabled() bool { func (w *WL) CompressionEnabled() bool {
return w.compress return w.compress
} }
// Dir returns the directory of the WAL. // Dir returns the directory of the WAL.
func (w *WAL) Dir() string { func (w *WL) Dir() string {
return w.dir return w.dir
} }
func (w *WAL) run() { func (w *WL) run() {
Loop: Loop:
for { for {
select { select {
@ -350,7 +350,7 @@ Loop:
// Repair attempts to repair the WAL based on the error. // Repair attempts to repair the WAL based on the error.
// It discards all data after the corruption. // It discards all data after the corruption.
func (w *WAL) Repair(origErr error) error { func (w *WL) Repair(origErr error) error {
// We could probably have a mode that only discards torn records right around // We could probably have a mode that only discards torn records right around
// the corruption to preserve as data much as possible. // the corruption to preserve as data much as possible.
// But that's not generally applicable if the records have any kind of causality. // But that's not generally applicable if the records have any kind of causality.
@ -466,7 +466,7 @@ func SegmentName(dir string, i int) string {
// NextSegment creates the next segment and closes the previous one asynchronously. // NextSegment creates the next segment and closes the previous one asynchronously.
// It returns the file number of the new file. // It returns the file number of the new file.
func (w *WAL) NextSegment() (int, error) { func (w *WL) NextSegment() (int, error) {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
return w.nextSegment(true) return w.nextSegment(true)
@ -474,7 +474,7 @@ func (w *WAL) NextSegment() (int, error) {
// NextSegmentSync creates the next segment and closes the previous one in sync. // NextSegmentSync creates the next segment and closes the previous one in sync.
// It returns the file number of the new file. // It returns the file number of the new file.
func (w *WAL) NextSegmentSync() (int, error) { func (w *WL) NextSegmentSync() (int, error) {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
return w.nextSegment(false) return w.nextSegment(false)
@ -482,9 +482,9 @@ func (w *WAL) NextSegmentSync() (int, error) {
// nextSegment creates the next segment and closes the previous one. // nextSegment creates the next segment and closes the previous one.
// It returns the file number of the new file. // It returns the file number of the new file.
func (w *WAL) nextSegment(async bool) (int, error) { func (w *WL) nextSegment(async bool) (int, error) {
if w.closed { if w.closed {
return 0, errors.New("wal is closed") return 0, errors.New("wlog is closed")
} }
// Only flush the current page if it actually holds data. // Only flush the current page if it actually holds data.
@ -519,7 +519,7 @@ func (w *WAL) nextSegment(async bool) (int, error) {
return next.Index(), nil return next.Index(), nil
} }
func (w *WAL) setSegment(segment *Segment) error { func (w *WL) setSegment(segment *Segment) error {
w.segment = segment w.segment = segment
// Correctly initialize donePages. // Correctly initialize donePages.
@ -535,7 +535,7 @@ func (w *WAL) setSegment(segment *Segment) error {
// flushPage writes the new contents of the page to disk. If no more records will fit into // flushPage writes the new contents of the page to disk. If no more records will fit into
// the page, the remaining bytes will be set to zero and a new page will be started. // the page, the remaining bytes will be set to zero and a new page will be started.
// If clear is true, this is enforced regardless of how many bytes are left in the page. // If clear is true, this is enforced regardless of how many bytes are left in the page.
func (w *WAL) flushPage(clear bool) error { func (w *WL) flushPage(clear bool) error {
w.metrics.pageFlushes.Inc() w.metrics.pageFlushes.Inc()
p := w.page p := w.page
@ -601,13 +601,13 @@ func (t recType) String() string {
} }
} }
func (w *WAL) pagesPerSegment() int { func (w *WL) pagesPerSegment() int {
return w.segmentSize / pageSize return w.segmentSize / pageSize
} }
// Log writes the records into the log. // Log writes the records into the log.
// Multiple records can be passed at once to reduce writes and increase throughput. // Multiple records can be passed at once to reduce writes and increase throughput.
func (w *WAL) Log(recs ...[]byte) error { func (w *WL) Log(recs ...[]byte) error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
// Callers could just implement their own list record format but adding // Callers could just implement their own list record format but adding
@ -625,7 +625,7 @@ func (w *WAL) Log(recs ...[]byte) error {
// - the final record of a batch // - the final record of a batch
// - the record is bigger than the page size // - the record is bigger than the page size
// - the current page is full. // - the current page is full.
func (w *WAL) log(rec []byte, final bool) error { func (w *WL) log(rec []byte, final bool) error {
// When the last page flush failed the page will remain full. // When the last page flush failed the page will remain full.
// When the page is full, need to flush it before trying to add more records to it. // When the page is full, need to flush it before trying to add more records to it.
if w.page.full() { if w.page.full() {
@ -721,7 +721,7 @@ func (w *WAL) log(rec []byte, final bool) error {
// LastSegmentAndOffset returns the last segment number of the WAL // LastSegmentAndOffset returns the last segment number of the WAL
// and the offset in that file upto which the segment has been filled. // and the offset in that file upto which the segment has been filled.
func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) { func (w *WL) LastSegmentAndOffset() (seg, offset int, err error) {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
@ -736,7 +736,7 @@ func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) {
} }
// Truncate drops all segments before i. // Truncate drops all segments before i.
func (w *WAL) Truncate(i int) (err error) { func (w *WL) Truncate(i int) (err error) {
w.metrics.truncateTotal.Inc() w.metrics.truncateTotal.Inc()
defer func() { defer func() {
if err != nil { if err != nil {
@ -758,27 +758,27 @@ func (w *WAL) Truncate(i int) (err error) {
return nil return nil
} }
func (w *WAL) fsync(f *Segment) error { func (w *WL) fsync(f *Segment) error {
start := time.Now() start := time.Now()
err := f.Sync() err := f.Sync()
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
return err return err
} }
// Sync forces a file sync on the current wal segment. This function is meant // Sync forces a file sync on the current write log segment. This function is meant
// to be used only on tests due to different behaviour on Operating Systems // to be used only on tests due to different behaviour on Operating Systems
// like windows and linux // like windows and linux
func (w *WAL) Sync() error { func (w *WL) Sync() error {
return w.fsync(w.segment) return w.fsync(w.segment)
} }
// Close flushes all writes and closes active segment. // Close flushes all writes and closes active segment.
func (w *WAL) Close() (err error) { func (w *WL) Close() (err error) {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
if w.closed { if w.closed {
return errors.New("wal already closed") return errors.New("wlog already closed")
} }
if w.segment == nil { if w.segment == nil {
@ -811,8 +811,8 @@ func (w *WAL) Close() (err error) {
// Segments returns the range [first, n] of currently existing segments. // Segments returns the range [first, n] of currently existing segments.
// If no segments are found, first and n are -1. // If no segments are found, first and n are -1.
func Segments(walDir string) (first, last int, err error) { func Segments(wlDir string) (first, last int, err error) {
refs, err := listSegments(walDir) refs, err := listSegments(wlDir)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
@ -979,8 +979,8 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
return n, nil return n, nil
} }
// Computing size of the WAL. // Size computes the size of the write log.
// We do this by adding the sizes of all the files under the WAL dir. // We do this by adding the sizes of all the files under the WAL dir.
func (w *WAL) Size() (int64, error) { func (w *WL) Size() (int64, error) {
return fileutil.DirSize(w.Dir()) return fileutil.DirSize(w.Dir())
} }

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package wal package wlog
import ( import (
"bytes" "bytes"
@ -137,7 +137,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
} }
first, last, err := Segments(w.Dir()) first, last, err := Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, 1+last-first, "wal creation didn't result in expected number of segments") require.Equal(t, 3, 1+last-first, "wlog creation didn't result in expected number of segments")
require.NoError(t, w.Close()) require.NoError(t, w.Close())