Merge pull request #183 from prometheus/walbrokensegment

Truncate segments on broken header
This commit is contained in:
Fabian Reinartz 2017-10-25 09:27:44 +02:00 committed by GitHub
commit 6ecdaa5314
3 changed files with 111 additions and 58 deletions

67
util_test.go Normal file
View file

@ -0,0 +1,67 @@
// The MIT License (MIT)
// Copyright (c) 2014 Ben Johnson
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package tsdb
import (
"fmt"
"path/filepath"
"reflect"
"runtime"
"testing"
)
// Assert fails the test if the condition is false.
func Assert(tb testing.TB, condition bool, msg string, v ...interface{}) {
if !condition {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
tb.FailNow()
}
}
// Ok fails the test if an err is not nil.
func Ok(tb testing.TB, err error) {
if err != nil {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
tb.FailNow()
}
}
// NotOk fails the test if an err is nil.
func NotOk(tb testing.TB, err error) {
if err == nil {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: expected error, got nothing \033[39m\n\n", filepath.Base(file), line)
tb.FailNow()
}
}
// Equals fails the test if exp is not equal to act.
func Equals(tb testing.TB, exp, act interface{}) {
if !reflect.DeepEqual(exp, act) {
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
tb.FailNow()
}
}

17
wal.go
View file

@ -222,12 +222,21 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration,
if err != nil {
return nil, err
}
for _, fn := range fns {
for i, fn := range fns {
f, err := w.openSegmentFile(fn)
if err != nil {
return nil, err
if err == nil {
w.files = append(w.files, newSegmentFile(f))
continue
}
w.files = append(w.files, newSegmentFile(f))
level.Warn(logger).Log("msg", "invalid segment file detected, truncating WAL", "err", err, "file", fn)
for _, fn := range fns[i:] {
if err := os.Remove(fn); err != nil {
return w, errors.Wrap(err, "removing segment failed")
}
}
break
}
go w.run(flushInterval)

View file

@ -15,69 +15,16 @@ package tsdb
import (
"encoding/binary"
"fmt"
"io/ioutil"
"math/rand"
"os"
"testing"
"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/fileutil"
"github.com/stretchr/testify/require"
)
func TestSegmentWAL_Open(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_wal_open")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
// Create segment files with an appropriate header.
for i := 1; i <= 5; i++ {
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], WALMagic)
metab[4] = WALFormatDefault
f, err := os.Create(fmt.Sprintf("%s/000%d", tmpdir, i))
require.NoError(t, err)
_, err = f.Write(metab)
require.NoError(t, err)
require.NoError(t, f.Close())
}
// Initialize 5 correct segment files.
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
require.NoError(t, err)
require.Equal(t, 5, len(w.files), "unexpected number of segments loaded")
// Validate that files are locked properly.
for _, of := range w.files {
f, err := os.Open(of.Name())
require.NoError(t, err, "open locked segment %s", f.Name())
_, err = f.Read([]byte{0})
require.NoError(t, err, "read locked segment %s", f.Name())
_, err = f.Write([]byte{0})
require.Error(t, err, "write to tail segment file %s", f.Name())
require.NoError(t, f.Close())
}
for _, f := range w.files {
require.NoError(t, f.Close())
}
// Make initialization fail by corrupting the header of one file.
f, err := os.OpenFile(w.files[3].Name(), os.O_WRONLY, 0666)
require.NoError(t, err)
_, err = f.WriteAt([]byte{0}, 4)
require.NoError(t, err)
w, err = OpenSegmentWAL(tmpdir, nil, 0, nil)
require.Error(t, err, "open with corrupted segments")
}
func TestSegmentWAL_cut(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
require.NoError(t, err)
@ -312,6 +259,36 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
}
}
func TestWALRestoreCorrupted_invalidSegment(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_log_restore")
Ok(t, err)
defer os.RemoveAll(dir)
wal, err := OpenSegmentWAL(dir, nil, 0, nil)
Ok(t, err)
_, err = wal.createSegmentFile(dir + "/000000")
Ok(t, err)
f, err := wal.createSegmentFile(dir + "/000001")
Ok(t, err)
_, err = wal.createSegmentFile(dir + "/000002")
Ok(t, err)
// Make header of second segment invalid.
_, err = f.WriteAt([]byte{1, 2, 3, 4}, 0)
Ok(t, err)
Ok(t, f.Close())
Ok(t, wal.Close())
wal, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil)
Ok(t, err)
fns, err := fileutil.ReadDir(dir)
Ok(t, err)
Equals(t, []string{"000000"}, fns)
}
// Test reading from a WAL that has been corrupted through various means.
func TestWALRestoreCorrupted(t *testing.T) {
cases := []struct {