mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Avoid creation of 0 sized segments. (#527)
If the corrupt segment is full, then we set donePages on open,c59ed492b2/wal/wal.go (L235-L243)
Then when we try to repair, we set the segment to be a new segment but we don't update the donePages:c59ed492b2/wal/wal.go (L334)
We we try to log to this, because donePages is full, we will never log anything to this segment and create a new one:c59ed492b2/wal/wal.go (L486)
This does not cause issues because we simply concatenate the segments on read, there by transparently skipping this `0b` segment.
This commit is contained in:
parent
b48394eeb3
commit
10d395259b
46
wal/wal.go
46
wal/wal.go
|
@ -228,19 +228,23 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
|
||||||
}
|
}
|
||||||
// Fresh dir, no segments yet.
|
// Fresh dir, no segments yet.
|
||||||
if j == -1 {
|
if j == -1 {
|
||||||
if w.segment, err = CreateSegment(w.dir, 0); err != nil {
|
segment, err := CreateSegment(w.dir, 0)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if w.segment, err = OpenWriteSegment(logger, w.dir, j); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Correctly initialize donePages.
|
|
||||||
stat, err := w.segment.Stat()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
w.donePages = int(stat.Size() / pageSize)
|
|
||||||
|
if err := w.setSegment(segment); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
segment, err := OpenWriteSegment(logger, w.dir, j)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.setSegment(segment); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
go w.run()
|
go w.run()
|
||||||
|
|
||||||
|
@ -331,7 +335,9 @@ func (w *WAL) Repair(origErr error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.segment = s
|
if err := w.setSegment(s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
f, err := os.Open(tmpfn)
|
f, err := os.Open(tmpfn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -382,8 +388,9 @@ func (w *WAL) nextSegment() error {
|
||||||
return errors.Wrap(err, "create new segment file")
|
return errors.Wrap(err, "create new segment file")
|
||||||
}
|
}
|
||||||
prev := w.segment
|
prev := w.segment
|
||||||
w.segment = next
|
if err := w.setSegment(next); err != nil {
|
||||||
w.donePages = 0
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Don't block further writes by fsyncing the last segment.
|
// Don't block further writes by fsyncing the last segment.
|
||||||
w.actorc <- func() {
|
w.actorc <- func() {
|
||||||
|
@ -397,6 +404,19 @@ func (w *WAL) nextSegment() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WAL) setSegment(segment *Segment) error {
|
||||||
|
w.segment = segment
|
||||||
|
|
||||||
|
// Correctly initialize donePages.
|
||||||
|
stat, err := segment.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.donePages = int(stat.Size() / pageSize)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// flushPage writes the new contents of the page to disk. If no more records will fit into
|
// flushPage writes the new contents of the page to disk. If no more records will fit into
|
||||||
// the page, the remaining bytes will be set to zero and a new page will be started.
|
// the page, the remaining bytes will be set to zero and a new page will be started.
|
||||||
// If clear is true, this is enforced regardless of how many bytes are left in the page.
|
// If clear is true, this is enforced regardless of how many bytes are left in the page.
|
||||||
|
|
|
@ -27,7 +27,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWAL_Repair(t *testing.T) {
|
func TestWAL_Repair(t *testing.T) {
|
||||||
|
|
||||||
for name, test := range map[string]struct {
|
for name, test := range map[string]struct {
|
||||||
corrSgm int // Which segment to corrupt.
|
corrSgm int // Which segment to corrupt.
|
||||||
corrFunc func(f *os.File) // Func that applies the corruption.
|
corrFunc func(f *os.File) // Func that applies the corruption.
|
||||||
|
@ -115,7 +114,8 @@ func TestWAL_Repair(t *testing.T) {
|
||||||
// We create 3 segments with 3 records each and
|
// We create 3 segments with 3 records each and
|
||||||
// then corrupt a given record in a given segment.
|
// then corrupt a given record in a given segment.
|
||||||
// As a result we want a repaired WAL with given intact records.
|
// As a result we want a repaired WAL with given intact records.
|
||||||
w, err := NewSize(nil, nil, dir, 3*pageSize)
|
segSize := 3 * pageSize
|
||||||
|
w, err := NewSize(nil, nil, dir, segSize)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
var records [][]byte
|
var records [][]byte
|
||||||
|
@ -136,7 +136,7 @@ func TestWAL_Repair(t *testing.T) {
|
||||||
|
|
||||||
testutil.Ok(t, f.Close())
|
testutil.Ok(t, f.Close())
|
||||||
|
|
||||||
w, err = New(nil, nil, dir)
|
w, err = NewSize(nil, nil, dir, segSize)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
sr, err := NewSegmentsReader(dir)
|
sr, err := NewSegmentsReader(dir)
|
||||||
|
@ -166,6 +166,11 @@ func TestWAL_Repair(t *testing.T) {
|
||||||
t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10])
|
t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure the last segment is the corrupt segment.
|
||||||
|
_, last, err := w.Segments()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, test.corrSgm, last)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue