Fix WAL errors and add tests for it

This commit is contained in:
Fabian Reinartz 2017-09-07 10:58:34 +02:00
parent 81222849bc
commit 970bffec8d
2 changed files with 33 additions and 13 deletions

18
wal.go
View file

@ -358,11 +358,14 @@ Loop:
if err := csf.Close(); err != nil { if err := csf.Close(); err != nil {
return errors.Wrap(err, "close tmp file") return errors.Wrap(err, "close tmp file")
} }
if err := renameFile(csf.Name(), candidates[0].Name()); err != nil {
if err := renameFile(csf.Name(), candidates[len(candidates)-1].Name()); err != nil {
return err return err
} }
for _, f := range candidates[1:] { for _, f := range candidates[1:] {
if err := f.Close(); err != nil {
return errors.Wrap(err, "close obsolete WAL segment file")
}
if err := os.RemoveAll(f.Name()); err != nil { if err := os.RemoveAll(f.Name()); err != nil {
return errors.Wrap(err, "delete WAL segment file") return errors.Wrap(err, "delete WAL segment file")
} }
@ -371,6 +374,17 @@ Loop:
return err return err
} }
// The file object of csf still holds the name before rename. Recreate it so
// subsequent truncations do not look at a non-existant file name.
csf.File, err = w.openSegmentFile(candidates[0].Name())
if err != nil {
return err
}
// We don't need it to be open.
if err := csf.Close(); err != nil {
return err
}
w.mtx.Lock() w.mtx.Lock()
w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...) w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...)
w.mtx.Unlock() w.mtx.Unlock()

View file

@ -122,8 +122,8 @@ func TestSegmentWAL_cut(t *testing.T) {
func TestSegmentWAL_Truncate(t *testing.T) { func TestSegmentWAL_Truncate(t *testing.T) {
const ( const (
numMetrics = 250 numMetrics = 50
batch = 50 batch = 10
) )
series, err := readPrometheusLabels("testdata/20k.series", numMetrics) series, err := readPrometheusLabels("testdata/20k.series", numMetrics)
require.NoError(t, err) require.NoError(t, err)
@ -160,10 +160,24 @@ func TestSegmentWAL_Truncate(t *testing.T) {
keep = append(keep, uint64(i)) keep = append(keep, uint64(i))
} }
err = w.Truncate(1000, newListPostings(keep))
require.NoError(t, err)
var expected []RefSeries
for i := 1; i <= numMetrics; i++ {
if i%2 == 1 || uint64(i) >= boundarySeries {
expected = append(expected, RefSeries{Ref: uint64(i), Labels: series[i-1]})
}
}
// Call Truncate once again to see whether we can read the written file without
// creating a new WAL.
err = w.Truncate(1000, newListPostings(keep)) err = w.Truncate(1000, newListPostings(keep))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Close()) require.NoError(t, w.Close())
// The same again with a new WAL.
w, err = OpenSegmentWAL(dir, nil, 0) w, err = OpenSegmentWAL(dir, nil, 0)
require.NoError(t, err) require.NoError(t, err)
@ -175,15 +189,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
return nil return nil
}, nil, nil) }, nil, nil)
var expected []RefSeries require.Equal(t, expected, readSeries)
for i := 1; i <= numMetrics; i++ {
if i%2 == 1 || uint64(i) >= boundarySeries {
expected = append(expected, RefSeries{Ref: uint64(i), Labels: series[i-1]})
}
}
require.Equal(t, len(expected), len(readSeries))
} }
// Symmetrical test of reading and writing to the WAL via its main interface. // Symmetrical test of reading and writing to the WAL via its main interface.