diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 8617828e2..663fcd5ed 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1261,7 +1261,7 @@ func TestSizeRetention(t *testing.T) { testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") // Create a WAL checkpoint, and compare sizes. - first, last, err := db.Head().wal.Segments() + first, last, err := wal.Segments(db.Head().wal.Dir()) testutil.Ok(t, err) _, err = wal.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x uint64) bool { return false }, 0) testutil.Ok(t, err) diff --git a/tsdb/head.go b/tsdb/head.go index 6fb7b0cd3..d99a48987 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -693,7 +693,7 @@ func (h *Head) Init(minValidTime int64) error { walReplayStart := time.Now() // Find the last segment. - _, last, err := h.wal.Segments() + _, last, err := wal.Segments(h.wal.Dir()) if err != nil { return errors.Wrap(err, "finding WAL segments") } @@ -819,7 +819,7 @@ func (h *Head) Truncate(mint int64) (err error) { } start = time.Now() - first, last, err := h.wal.Segments() + first, last, err := wal.Segments(h.wal.Dir()) if err != nil { return errors.Wrap(err, "get segment range") } @@ -1325,7 +1325,7 @@ func (h *Head) gc() { h.postings.Delete(deleted) if h.wal != nil { - _, last, _ := h.wal.Segments() + _, last, _ := wal.Segments(h.wal.Dir()) h.deletedMtx.Lock() // Keep series records until we're past segment 'last' // because the WAL will still have samples records with diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 1291966ab..3cea3b3e6 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1377,19 +1377,19 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } add(0) - _, last, err := wlog.Segments() + _, last, err := wal.Segments(wlog.Dir()) testutil.Ok(t, err) testutil.Equals(t, 0, last) add(1) testutil.Ok(t, h.Truncate(1)) - _, last, err = wlog.Segments() + _, last, err = wal.Segments(wlog.Dir()) testutil.Ok(t, err) testutil.Equals(t, 1, last) add(2) testutil.Ok(t, h.Truncate(2)) - _, last, err = wlog.Segments() + _, last, err = wal.Segments(wlog.Dir()) testutil.Ok(t, err) testutil.Equals(t, 2, last) } diff --git a/tsdb/wal/checkpoint_test.go b/tsdb/wal/checkpoint_test.go index 327de1d7c..d9a223889 100644 --- a/tsdb/wal/checkpoint_test.go +++ b/tsdb/wal/checkpoint_test.go @@ -148,7 +148,7 @@ func TestCheckpoint(t *testing.T) { var last int64 for i := 0; ; i++ { - _, n, err := w.Segments() + _, n, err := Segments(w.Dir()) testutil.Ok(t, err) if n >= 106 { break diff --git a/tsdb/wal/reader_test.go b/tsdb/wal/reader_test.go index 13e1932e3..bb9b9739f 100644 --- a/tsdb/wal/reader_test.go +++ b/tsdb/wal/reader_test.go @@ -371,7 +371,7 @@ func TestReaderFuzz_Live(t *testing.T) { }() // Tail the WAL and compare the results. - m, _, err := w.Segments() + m, _, err := Segments(w.Dir()) testutil.Ok(t, err) seg, err := OpenReadSegment(SegmentName(dir, m)) @@ -398,7 +398,7 @@ func TestReaderFuzz_Live(t *testing.T) { select { case <-segmentTicker.C: // check if new segments exist - _, last, err := w.Segments() + _, last, err := Segments(w.Dir()) testutil.Ok(t, err) if last <= seg.i { continue @@ -463,7 +463,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { testutil.Ok(t, err) // Try and LiveReader it. - m, _, err := w.Segments() + m, _, err := Segments(w.Dir()) testutil.Ok(t, err) seg, err := OpenReadSegment(SegmentName(dir, m)) @@ -511,7 +511,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { testutil.Ok(t, err) // Try and LiveReader it. - m, _, err := w.Segments() + m, _, err := Segments(w.Dir()) testutil.Ok(t, err) seg, err := OpenReadSegment(SegmentName(dir, m)) diff --git a/tsdb/wal/wal.go b/tsdb/wal/wal.go index 83cbb61b8..c5401ddf1 100644 --- a/tsdb/wal/wal.go +++ b/tsdb/wal/wal.go @@ -267,7 +267,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi } w.metrics = newWALMetrics(reg) - _, last, err := w.Segments() + _, last, err := Segments(w.Dir()) if err != nil { return nil, errors.Wrap(err, "get segment range") } @@ -279,7 +279,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi writeSegmentIndex = last + 1 } - segment, err := CreateSegment(w.dir, writeSegmentIndex) + segment, err := CreateSegment(w.Dir(), writeSegmentIndex) if err != nil { return nil, err } @@ -355,7 +355,7 @@ func (w *WAL) Repair(origErr error) error { "segment", cerr.Segment, "offset", cerr.Offset) // All segments behind the corruption can no longer be used. - segs, err := listSegments(w.dir) + segs, err := listSegments(w.Dir()) if err != nil { return errors.Wrap(err, "list segments") } @@ -374,7 +374,7 @@ func (w *WAL) Repair(origErr error) error { if s.index <= cerr.Segment { continue } - if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil { + if err := os.Remove(filepath.Join(w.Dir(), s.name)); err != nil { return errors.Wrapf(err, "delete segment:%v", s.index) } } @@ -383,14 +383,14 @@ func (w *WAL) Repair(origErr error) error { // its records up to the corruption. level.Warn(w.logger).Log("msg", "Rewrite corrupted segment", "segment", cerr.Segment) - fn := SegmentName(w.dir, cerr.Segment) + fn := SegmentName(w.Dir(), cerr.Segment) tmpfn := fn + ".repair" if err := fileutil.Rename(fn, tmpfn); err != nil { return err } // Create a clean segment and make it the active one. - s, err := CreateSegment(w.dir, cerr.Segment) + s, err := CreateSegment(w.Dir(), cerr.Segment) if err != nil { return err } @@ -438,7 +438,7 @@ func (w *WAL) Repair(origErr error) error { // We always want to start writing to a new Segment rather than an existing // Segment, which is handled by NewSize, but earlier in Repair we're deleting // all segments that come after the corrupted Segment. Recreate a new Segment here. - s, err = CreateSegment(w.dir, cerr.Segment+1) + s, err = CreateSegment(w.Dir(), cerr.Segment+1) if err != nil { return err } @@ -468,7 +468,7 @@ func (w *WAL) nextSegment() error { return err } } - next, err := CreateSegment(w.dir, w.segment.Index()+1) + next, err := CreateSegment(w.Dir(), w.segment.Index()+1) if err != nil { return errors.Wrap(err, "create new segment file") } @@ -679,19 +679,6 @@ func (w *WAL) log(rec []byte, final bool) error { return nil } -// Segments returns the range [first, n] of currently existing segments. -// If no segments are found, first and n are -1. -func (w *WAL) Segments() (first, last int, err error) { - refs, err := listSegments(w.dir) - if err != nil { - return 0, 0, err - } - if len(refs) == 0 { - return -1, -1, nil - } - return refs[0].index, refs[len(refs)-1].index, nil -} - // Truncate drops all segments before i. func (w *WAL) Truncate(i int) (err error) { w.metrics.truncateTotal.Inc() @@ -700,7 +687,7 @@ func (w *WAL) Truncate(i int) (err error) { w.metrics.truncateFail.Inc() } }() - refs, err := listSegments(w.dir) + refs, err := listSegments(w.Dir()) if err != nil { return err } @@ -708,7 +695,7 @@ func (w *WAL) Truncate(i int) (err error) { if r.index >= i { break } - if err = os.Remove(filepath.Join(w.dir, r.name)); err != nil { + if err = os.Remove(filepath.Join(w.Dir(), r.name)); err != nil { return err } } @@ -759,6 +746,19 @@ func (w *WAL) Close() (err error) { return nil } +// Segments returns the range [first, n] of currently existing segments. +// If no segments are found, first and n are -1. +func Segments(walDir string) (first, last int, err error) { + refs, err := listSegments(walDir) + if err != nil { + return 0, 0, err + } + if len(refs) == 0 { + return -1, -1, nil + } + return refs[0].index, refs[len(refs)-1].index, nil +} + type segmentRef struct { name string index int diff --git a/tsdb/wal/wal_test.go b/tsdb/wal/wal_test.go index 626caa419..e38a4d806 100644 --- a/tsdb/wal/wal_test.go +++ b/tsdb/wal/wal_test.go @@ -138,7 +138,7 @@ func TestWALRepair_ReadingError(t *testing.T) { records = append(records, b) testutil.Ok(t, w.Log(b)) } - first, last, err := w.Segments() + first, last, err := Segments(w.Dir()) testutil.Ok(t, err) testutil.Equals(t, 3, 1+last-first, "wal creation didn't result in expected number of segments") @@ -156,7 +156,7 @@ func TestWALRepair_ReadingError(t *testing.T) { testutil.Ok(t, err) defer w.Close() - first, last, err = w.Segments() + first, last, err = Segments(w.Dir()) testutil.Ok(t, err) // Backfill segments from the most recent checkpoint onwards. @@ -201,7 +201,7 @@ func TestWALRepair_ReadingError(t *testing.T) { } // Make sure there is a new 0 size Segment after the corrupted Segment. - _, last, err = w.Segments() + _, last, err = Segments(w.Dir()) testutil.Ok(t, err) testutil.Equals(t, test.corrSgm+1, last) fi, err := os.Stat(SegmentName(dir, last)) diff --git a/tsdb/wal/watcher_test.go b/tsdb/wal/watcher_test.go index 1d98d24b2..b2b26ba4c 100644 --- a/tsdb/wal/watcher_test.go +++ b/tsdb/wal/watcher_test.go @@ -141,7 +141,7 @@ func TestTailSamples(t *testing.T) { } // Start read after checkpoint, no more data written. - first, last, err := w.Segments() + first, last, err := Segments(w.Dir()) testutil.Ok(t, err) wt := newWriteToMock() @@ -225,7 +225,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { } testutil.Ok(t, w.Log(recs...)) - _, _, err = w.Segments() + _, _, err = Segments(w.Dir()) testutil.Ok(t, err) wt := newWriteToMock() @@ -317,7 +317,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { } } - _, _, err = w.Segments() + _, _, err = Segments(w.Dir()) testutil.Ok(t, err) wt := newWriteToMock() watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) @@ -386,7 +386,7 @@ func TestReadCheckpoint(t *testing.T) { w.Truncate(32) // Start read after checkpoint, no more data written. - _, _, err = w.Segments() + _, _, err = Segments(w.Dir()) testutil.Ok(t, err) wt := newWriteToMock() @@ -535,7 +535,7 @@ func TestCheckpointSeriesReset(t *testing.T) { } } - _, _, err = w.Segments() + _, _, err = Segments(w.Dir()) testutil.Ok(t, err) wt := newWriteToMock()