Refactor WAL.Segments method to be part of the wal package (#6477)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-09-01 09:16:57 +00:00 committed by GitHub
parent dba729f6b6
commit 2255b6f62f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 43 additions and 43 deletions

View file

@ -1261,7 +1261,7 @@ func TestSizeRetention(t *testing.T) {
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
// Create a WAL checkpoint, and compare sizes. // Create a WAL checkpoint, and compare sizes.
first, last, err := db.Head().wal.Segments() first, last, err := wal.Segments(db.Head().wal.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = wal.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x uint64) bool { return false }, 0) _, err = wal.Checkpoint(log.NewNopLogger(), db.Head().wal, first, last-1, func(x uint64) bool { return false }, 0)
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -693,7 +693,7 @@ func (h *Head) Init(minValidTime int64) error {
walReplayStart := time.Now() walReplayStart := time.Now()
// Find the last segment. // Find the last segment.
_, last, err := h.wal.Segments() _, last, err := wal.Segments(h.wal.Dir())
if err != nil { if err != nil {
return errors.Wrap(err, "finding WAL segments") return errors.Wrap(err, "finding WAL segments")
} }
@ -819,7 +819,7 @@ func (h *Head) Truncate(mint int64) (err error) {
} }
start = time.Now() start = time.Now()
first, last, err := h.wal.Segments() first, last, err := wal.Segments(h.wal.Dir())
if err != nil { if err != nil {
return errors.Wrap(err, "get segment range") return errors.Wrap(err, "get segment range")
} }
@ -1325,7 +1325,7 @@ func (h *Head) gc() {
h.postings.Delete(deleted) h.postings.Delete(deleted)
if h.wal != nil { if h.wal != nil {
_, last, _ := h.wal.Segments() _, last, _ := wal.Segments(h.wal.Dir())
h.deletedMtx.Lock() h.deletedMtx.Lock()
// Keep series records until we're past segment 'last' // Keep series records until we're past segment 'last'
// because the WAL will still have samples records with // because the WAL will still have samples records with

View file

@ -1377,19 +1377,19 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
} }
add(0) add(0)
_, last, err := wlog.Segments() _, last, err := wal.Segments(wlog.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, last) testutil.Equals(t, 0, last)
add(1) add(1)
testutil.Ok(t, h.Truncate(1)) testutil.Ok(t, h.Truncate(1))
_, last, err = wlog.Segments() _, last, err = wal.Segments(wlog.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 1, last) testutil.Equals(t, 1, last)
add(2) add(2)
testutil.Ok(t, h.Truncate(2)) testutil.Ok(t, h.Truncate(2))
_, last, err = wlog.Segments() _, last, err = wal.Segments(wlog.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 2, last) testutil.Equals(t, 2, last)
} }

View file

@ -148,7 +148,7 @@ func TestCheckpoint(t *testing.T) {
var last int64 var last int64
for i := 0; ; i++ { for i := 0; ; i++ {
_, n, err := w.Segments() _, n, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
if n >= 106 { if n >= 106 {
break break

View file

@ -371,7 +371,7 @@ func TestReaderFuzz_Live(t *testing.T) {
}() }()
// Tail the WAL and compare the results. // Tail the WAL and compare the results.
m, _, err := w.Segments() m, _, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m)) seg, err := OpenReadSegment(SegmentName(dir, m))
@ -398,7 +398,7 @@ func TestReaderFuzz_Live(t *testing.T) {
select { select {
case <-segmentTicker.C: case <-segmentTicker.C:
// check if new segments exist // check if new segments exist
_, last, err := w.Segments() _, last, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
if last <= seg.i { if last <= seg.i {
continue continue
@ -463,7 +463,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
// Try and LiveReader it. // Try and LiveReader it.
m, _, err := w.Segments() m, _, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m)) seg, err := OpenReadSegment(SegmentName(dir, m))
@ -511,7 +511,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
// Try and LiveReader it. // Try and LiveReader it.
m, _, err := w.Segments() m, _, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m)) seg, err := OpenReadSegment(SegmentName(dir, m))

View file

@ -267,7 +267,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
} }
w.metrics = newWALMetrics(reg) w.metrics = newWALMetrics(reg)
_, last, err := w.Segments() _, last, err := Segments(w.Dir())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "get segment range") return nil, errors.Wrap(err, "get segment range")
} }
@ -279,7 +279,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
writeSegmentIndex = last + 1 writeSegmentIndex = last + 1
} }
segment, err := CreateSegment(w.dir, writeSegmentIndex) segment, err := CreateSegment(w.Dir(), writeSegmentIndex)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -355,7 +355,7 @@ func (w *WAL) Repair(origErr error) error {
"segment", cerr.Segment, "offset", cerr.Offset) "segment", cerr.Segment, "offset", cerr.Offset)
// All segments behind the corruption can no longer be used. // All segments behind the corruption can no longer be used.
segs, err := listSegments(w.dir) segs, err := listSegments(w.Dir())
if err != nil { if err != nil {
return errors.Wrap(err, "list segments") return errors.Wrap(err, "list segments")
} }
@ -374,7 +374,7 @@ func (w *WAL) Repair(origErr error) error {
if s.index <= cerr.Segment { if s.index <= cerr.Segment {
continue continue
} }
if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil { if err := os.Remove(filepath.Join(w.Dir(), s.name)); err != nil {
return errors.Wrapf(err, "delete segment:%v", s.index) return errors.Wrapf(err, "delete segment:%v", s.index)
} }
} }
@ -383,14 +383,14 @@ func (w *WAL) Repair(origErr error) error {
// its records up to the corruption. // its records up to the corruption.
level.Warn(w.logger).Log("msg", "Rewrite corrupted segment", "segment", cerr.Segment) level.Warn(w.logger).Log("msg", "Rewrite corrupted segment", "segment", cerr.Segment)
fn := SegmentName(w.dir, cerr.Segment) fn := SegmentName(w.Dir(), cerr.Segment)
tmpfn := fn + ".repair" tmpfn := fn + ".repair"
if err := fileutil.Rename(fn, tmpfn); err != nil { if err := fileutil.Rename(fn, tmpfn); err != nil {
return err return err
} }
// Create a clean segment and make it the active one. // Create a clean segment and make it the active one.
s, err := CreateSegment(w.dir, cerr.Segment) s, err := CreateSegment(w.Dir(), cerr.Segment)
if err != nil { if err != nil {
return err return err
} }
@ -438,7 +438,7 @@ func (w *WAL) Repair(origErr error) error {
// We always want to start writing to a new Segment rather than an existing // We always want to start writing to a new Segment rather than an existing
// Segment, which is handled by NewSize, but earlier in Repair we're deleting // Segment, which is handled by NewSize, but earlier in Repair we're deleting
// all segments that come after the corrupted Segment. Recreate a new Segment here. // all segments that come after the corrupted Segment. Recreate a new Segment here.
s, err = CreateSegment(w.dir, cerr.Segment+1) s, err = CreateSegment(w.Dir(), cerr.Segment+1)
if err != nil { if err != nil {
return err return err
} }
@ -468,7 +468,7 @@ func (w *WAL) nextSegment() error {
return err return err
} }
} }
next, err := CreateSegment(w.dir, w.segment.Index()+1) next, err := CreateSegment(w.Dir(), w.segment.Index()+1)
if err != nil { if err != nil {
return errors.Wrap(err, "create new segment file") return errors.Wrap(err, "create new segment file")
} }
@ -679,19 +679,6 @@ func (w *WAL) log(rec []byte, final bool) error {
return nil return nil
} }
// Segments returns the range [first, n] of currently existing segments.
// If no segments are found, first and n are -1.
func (w *WAL) Segments() (first, last int, err error) {
refs, err := listSegments(w.dir)
if err != nil {
return 0, 0, err
}
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0].index, refs[len(refs)-1].index, nil
}
// Truncate drops all segments before i. // Truncate drops all segments before i.
func (w *WAL) Truncate(i int) (err error) { func (w *WAL) Truncate(i int) (err error) {
w.metrics.truncateTotal.Inc() w.metrics.truncateTotal.Inc()
@ -700,7 +687,7 @@ func (w *WAL) Truncate(i int) (err error) {
w.metrics.truncateFail.Inc() w.metrics.truncateFail.Inc()
} }
}() }()
refs, err := listSegments(w.dir) refs, err := listSegments(w.Dir())
if err != nil { if err != nil {
return err return err
} }
@ -708,7 +695,7 @@ func (w *WAL) Truncate(i int) (err error) {
if r.index >= i { if r.index >= i {
break break
} }
if err = os.Remove(filepath.Join(w.dir, r.name)); err != nil { if err = os.Remove(filepath.Join(w.Dir(), r.name)); err != nil {
return err return err
} }
} }
@ -759,6 +746,19 @@ func (w *WAL) Close() (err error) {
return nil return nil
} }
// Segments returns the range [first, n] of currently existing segments.
// If no segments are found, first and n are -1.
func Segments(walDir string) (first, last int, err error) {
refs, err := listSegments(walDir)
if err != nil {
return 0, 0, err
}
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0].index, refs[len(refs)-1].index, nil
}
type segmentRef struct { type segmentRef struct {
name string name string
index int index int

View file

@ -138,7 +138,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
records = append(records, b) records = append(records, b)
testutil.Ok(t, w.Log(b)) testutil.Ok(t, w.Log(b))
} }
first, last, err := w.Segments() first, last, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 3, 1+last-first, "wal creation didn't result in expected number of segments") testutil.Equals(t, 3, 1+last-first, "wal creation didn't result in expected number of segments")
@ -156,7 +156,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer w.Close() defer w.Close()
first, last, err = w.Segments() first, last, err = Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
// Backfill segments from the most recent checkpoint onwards. // Backfill segments from the most recent checkpoint onwards.
@ -201,7 +201,7 @@ func TestWALRepair_ReadingError(t *testing.T) {
} }
// Make sure there is a new 0 size Segment after the corrupted Segment. // Make sure there is a new 0 size Segment after the corrupted Segment.
_, last, err = w.Segments() _, last, err = Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, test.corrSgm+1, last) testutil.Equals(t, test.corrSgm+1, last)
fi, err := os.Stat(SegmentName(dir, last)) fi, err := os.Stat(SegmentName(dir, last))

View file

@ -141,7 +141,7 @@ func TestTailSamples(t *testing.T) {
} }
// Start read after checkpoint, no more data written. // Start read after checkpoint, no more data written.
first, last, err := w.Segments() first, last, err := Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
@ -225,7 +225,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) {
} }
testutil.Ok(t, w.Log(recs...)) testutil.Ok(t, w.Log(recs...))
_, _, err = w.Segments() _, _, err = Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
@ -317,7 +317,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
} }
} }
_, _, err = w.Segments() _, _, err = Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir) watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir)
@ -386,7 +386,7 @@ func TestReadCheckpoint(t *testing.T) {
w.Truncate(32) w.Truncate(32)
// Start read after checkpoint, no more data written. // Start read after checkpoint, no more data written.
_, _, err = w.Segments() _, _, err = Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()
@ -535,7 +535,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
} }
} }
_, _, err = w.Segments() _, _, err = Segments(w.Dir())
testutil.Ok(t, err) testutil.Ok(t, err)
wt := newWriteToMock() wt := newWriteToMock()