Test no panic after a WAL corruption (#7625)

* no panic the head memseries has chunks in it

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>

* fix a panic when querying after a wal corruption.

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>

* review nits

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>

* Add test for reading the data after a wal corruption.

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>

Update tsdb/db_test.go

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

Update tsdb/db_test.go

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>

* spellings

Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
Krasimir Georgiev 2020-07-21 10:02:13 +03:00 committed by GitHub
parent 9b8cc663f7
commit ccab2b30c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 2 deletions

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"bufio"
"context" "context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
@ -155,6 +156,67 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet) testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet)
} }
// TestNoPanicAfterWALCorrutpion ensures that querying the db after a WAL corruption doesn't cause a panic.
// https://github.com/prometheus/prometheus/issues/7548
func TestNoPanicAfterWALCorrutpion(t *testing.T) {
db, closeFn := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil)
t.Cleanup(closeFn)
// Append until the the first mmaped head chunk.
// This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted.
var expSamples []tsdbutil.Sample
var maxt int64
{
for {
app := db.Appender()
_, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0)
expSamples = append(expSamples, sample{t: maxt, v: 0})
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
mmapedChunks, err := ioutil.ReadDir(mmappedChunksDir(db.Dir()))
testutil.Ok(t, err)
if len(mmapedChunks) > 0 {
break
}
maxt++
}
testutil.Ok(t, db.Close())
}
// Corrupt the WAL after the first sample of the series so that it has at least one sample and
// it is not garbage collected.
// The repair deletes all WAL records after the corrupted record and these are read from the mmaped chunk.
{
walFiles, err := ioutil.ReadDir(path.Join(db.Dir(), "wal"))
testutil.Ok(t, err)
f, err := os.OpenFile(path.Join(db.Dir(), "wal", walFiles[0].Name()), os.O_RDWR, 0666)
testutil.Ok(t, err)
r := wal.NewReader(bufio.NewReader(f))
testutil.Assert(t, r.Next(), "reading the series record")
testutil.Assert(t, r.Next(), "reading the first sample record")
// Write an invalid record header to corrupt everything after the first wal sample.
_, err = f.WriteAt([]byte{99}, r.Offset())
testutil.Ok(t, err)
f.Close()
}
// Query the data.
{
db, err := Open(db.Dir(), nil, nil, nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, db.Close())
}()
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal), "WAL corruption count mismatch")
querier, err := db.Querier(context.TODO(), 0, maxt)
testutil.Ok(t, err)
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "", ""))
// The last sample should be missing as it was after the WAL segment corruption.
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: expSamples[0 : len(expSamples)-1]}, seriesSet)
}
}
func TestDataNotAvailableAfterRollback(t *testing.T) { func TestDataNotAvailableAfterRollback(t *testing.T) {
db, closeFn := openTestDB(t, nil, nil) db, closeFn := openTestDB(t, nil, nil)
defer func() { defer func() {

View file

@ -2078,8 +2078,9 @@ func (s *memSeries) chunkID(pos int) int {
return pos + s.firstChunkID return pos + s.firstChunkID
} }
// truncateChunksBefore removes all chunks from the series that have not timestamp // truncateChunksBefore removes all chunks from the series that
// at or after mint. Chunk IDs remain unchanged. // have no timestamp at or after mint.
// Chunk IDs remain unchanged.
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
var k int var k int
if s.headChunk != nil && s.headChunk.maxTime < mint { if s.headChunk != nil && s.headChunk.maxTime < mint {