Test createBlock and check all os.RemoveAll in the tests for errors. (#549)

Testing that createBlock creates blocks that can be opened.

and checking the os.RemoveAll for errors will catch errors for un-closed files under windows.

Many missing .Close() calls were added for fixing failing os.RemoveAll

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-03-19 15:31:57 +02:00 committed by GitHub
parent dd0d3c6f02
commit c3ffdf1a99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 268 additions and 101 deletions

View file

@ -28,6 +28,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
)
@ -244,7 +245,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
enc := json.NewEncoder(f)
enc.SetIndent("", "\t")
var merr MultiError
var merr tsdb_errors.MultiError
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
merr.Add(f.Close())
@ -283,7 +284,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
var closers []io.Closer
defer func() {
if err != nil {
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(closeAll(closers))
err = merr.Err()
@ -350,7 +351,7 @@ func (pb *Block) Close() error {
pb.pendingReaders.Wait()
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(pb.chunkr.Close())
merr.Add(pb.indexr.Close())

View file

@ -32,7 +32,9 @@ import (
func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
dir, err := ioutil.TempDir("", "metaversion")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{}))
@ -44,7 +46,9 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
func TestSetCompactionFailed(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0))
b, err := OpenBlock(nil, blockDir, nil)
@ -60,6 +64,19 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, b.Close())
}
func TestCreateBlock(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
if err == nil {
testutil.Ok(t, b.Close())
}
testutil.Ok(t, err)
}
// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)

View file

@ -25,6 +25,7 @@ import (
"strings"
"github.com/pkg/errors"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)
@ -67,7 +68,7 @@ func LastCheckpoint(dir string) (string, int, error) {
// DeleteCheckpoints deletes all checkpoints in a directory below a given index.
func DeleteCheckpoints(dir string, maxIndex int) error {
var errs MultiError
var errs tsdb_errors.MultiError
files, err := ioutil.ReadDir(dir)
if err != nil {

View file

@ -32,7 +32,9 @@ import (
func TestLastCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
_, _, err = LastCheckpoint(dir)
testutil.Equals(t, ErrNotFound, err)
@ -65,7 +67,9 @@ func TestLastCheckpoint(t *testing.T) {
func TestDeleteCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
testutil.Ok(t, DeleteCheckpoints(dir, 0))
@ -84,7 +88,9 @@ func TestDeleteCheckpoints(t *testing.T) {
func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
var enc RecordEncoder
// Create a dummy segment to bump the initial number.
@ -188,7 +194,9 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wal with an invalid records.
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))

View file

@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
@ -451,7 +452,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
return uid, nil
}
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(err)
if err != context.Canceled {
for _, b := range bs {
@ -529,7 +530,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
tmp := dir + ".tmp"
var closers []io.Closer
defer func(t time.Time) {
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(closeAll(closers))
err = merr.Err()
@ -592,7 +593,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above.
var merr MultiError
var merr tsdb_errors.MultiError
for _, w := range closers {
merr.Add(w.Close())
}
@ -658,7 +659,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
overlapping bool
)
defer func() {
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(closeAll(closers))
err = merr.Err()

View file

@ -421,7 +421,9 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
testutil.NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp")
@ -907,7 +909,9 @@ func TestDisableAutoCompactions(t *testing.T) {
func TestCancelCompactions(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "testCancelCompaction")
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
// Create some blocks to fall within the compaction range.
createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000))
@ -918,7 +922,9 @@ func TestCancelCompactions(t *testing.T) {
tmpdirCopy := tmpdir + "Copy"
err = fileutil.CopyDirs(tmpdir, tmpdirCopy)
testutil.Ok(t, err)
defer os.RemoveAll(tmpdirCopy)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdirCopy))
}()
// Measure the compaction time without interupting it.
var timeCompactionUninterrupted time.Duration

48
db.go
View file

@ -15,7 +15,6 @@
package tsdb
import (
"bytes"
"context"
"fmt"
"io"
@ -36,6 +35,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
@ -861,7 +861,7 @@ func (db *DB) Close() error {
g.Go(pb.Close)
}
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(g.Wait())
@ -1089,50 +1089,8 @@ func nextSequenceFile(dir string) (string, int, error) {
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
}
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
}
}
// Err returns the error list as an error or nil if it is empty.
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}
func closeAll(cs []io.Closer) error {
var merr MultiError
var merr tsdb_errors.MultiError
for _, c := range cs {
merr.Add(c.Close())

View file

@ -437,7 +437,9 @@ func TestDB_Snapshot(t *testing.T) {
snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
defer func() {
testutil.Ok(t, os.RemoveAll(snap))
}()
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
@ -504,7 +506,9 @@ Outer:
snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
defer func() {
testutil.Ok(t, os.RemoveAll(snap))
}()
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
@ -800,7 +804,9 @@ func TestTombstoneClean(t *testing.T) {
snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
defer func() {
testutil.Ok(t, os.RemoveAll(snap))
}()
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
@ -1323,10 +1329,13 @@ func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("clean", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
// Should be set to init values if no WAL or blocks exist so far.
testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime())
@ -1343,7 +1352,9 @@ func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("wal-only", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
@ -1365,6 +1376,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
testutil.Equals(t, int64(5000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
@ -1372,12 +1384,15 @@ func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("existing-block", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
createBlock(t, dir, genSeries(1, 1, 1000, 2000))
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
testutil.Equals(t, int64(2000), db.head.MinTime())
testutil.Equals(t, int64(2000), db.head.MaxTime())
@ -1385,7 +1400,9 @@ func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("existing-block-and-wal", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
@ -1411,6 +1428,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
db, err := Open(dir, nil, r, nil)
testutil.Ok(t, err)
defer db.Close()
testutil.Equals(t, int64(6000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())

62
errors/errors.go Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package errors
import (
"bytes"
"fmt"
)
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
}
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
}
}
// Err returns the error list as an error or nil if it is empty.
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}

View file

@ -117,7 +117,9 @@ func TestHead_ReadWAL(t *testing.T) {
}
dir, err := ioutil.TempDir("", "test_read_wal")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
@ -281,7 +283,9 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
}
dir, err := ioutil.TempDir("", "test_delete_series")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
@ -337,13 +341,17 @@ Outer:
for _, c := range cases {
dir, err := ioutil.TempDir("", "test_wal_reload")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
defer w.Close()
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
defer head.Close()
app := head.Appender()
for _, smpl := range smplsAll {
@ -361,8 +369,10 @@ Outer:
// Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err)
defer reloadedW.Close()
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
testutil.Ok(t, err)
defer reloadedHead.Close()
testutil.Ok(t, reloadedHead.Init(0))
for _, h := range []*Head{head, reloadedHead} {
indexr, err := h.Index()
@ -543,7 +553,9 @@ func TestDelete_e2e(t *testing.T) {
seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
}
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
hb, err := NewHead(nil, nil, nil, 100000)
testutil.Ok(t, err)
defer hb.Close()
@ -907,7 +919,9 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
func TestHead_LogRollback(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_rollback")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
@ -974,7 +988,9 @@ func TestWalRepair(t *testing.T) {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_head_repair")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)

View file

@ -150,7 +150,9 @@ func (m mockIndex) LabelIndices() ([][]string, error) {
func TestIndexRW_Create_Open(t *testing.T) {
dir, err := ioutil.TempDir("", "test_index_create")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
fn := filepath.Join(dir, indexFilename)
@ -168,6 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
testutil.Ok(t, err)
_, err = f.WriteAt([]byte{0, 0}, 0)
testutil.Ok(t, err)
f.Close()
_, err = NewFileReader(dir)
testutil.NotOk(t, err)
@ -176,7 +179,9 @@ func TestIndexRW_Create_Open(t *testing.T) {
func TestIndexRW_Postings(t *testing.T) {
dir, err := ioutil.TempDir("", "test_index_postings")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
fn := filepath.Join(dir, indexFilename)
@ -236,7 +241,9 @@ func TestIndexRW_Postings(t *testing.T) {
func TestPersistence_index_e2e(t *testing.T) {
dir, err := ioutil.TempDir("", "test_persistence_e2e")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000)
testutil.Ok(t, err)

View file

@ -21,6 +21,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
)
@ -134,7 +135,7 @@ func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
}
func (q *querier) Close() error {
var merr MultiError
var merr tsdb_errors.MultiError
for _, bq := range q.blocks {
merr.Add(bq.Close())
@ -251,7 +252,7 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) {
}
func (q *blockQuerier) Close() error {
var merr MultiError
var merr tsdb_errors.MultiError
merr.Add(q.index.Close())
merr.Add(q.chunks.Close())

View file

@ -1232,7 +1232,9 @@ func BenchmarkPersistedQueries(b *testing.B) {
b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil)
testutil.Ok(b, err)

View file

@ -70,7 +70,9 @@ func TestRepairBadIndexVersion(t *testing.T) {
// Touch chunks dir in block.
os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777)
defer os.RemoveAll(filepath.Join(dbDir, "chunks"))
defer func() {
testutil.Ok(t, os.RemoveAll(filepath.Join(dbDir, "chunks")))
}()
r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename))
testutil.Ok(t, err)
@ -89,7 +91,9 @@ func TestRepairBadIndexVersion(t *testing.T) {
if err = fileutil.CopyDirs(dbDir, tmpDbDir); err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpDir))
}()
// On DB opening all blocks in the base dir should be repaired.
db, err := Open(tmpDir, nil, nil, nil)
testutil.Ok(t, err)
@ -97,6 +101,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename))
testutil.Ok(t, err)
defer r.Close()
p, err = r.Postings("b", "1")
testutil.Ok(t, err)
res := []labels.Labels{}

View file

@ -26,7 +26,9 @@ import (
func TestWriteAndReadbackTombStones(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
ref := uint64(0)

View file

@ -30,6 +30,7 @@ import (
"time"
"github.com/go-kit/kit/log"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/testutil"
)
@ -268,21 +269,43 @@ func generateRandomEntries(w *WAL, records chan []byte) error {
return w.Log(recs...)
}
func allSegments(dir string) (io.Reader, error) {
type multiReadCloser struct {
reader io.Reader
closers []io.Closer
}
func (m *multiReadCloser) Read(p []byte) (n int, err error) {
return m.reader.Read(p)
}
func (m *multiReadCloser) Close() error {
var merr tsdb_errors.MultiError
for _, closer := range m.closers {
merr.Add(closer.Close())
}
return merr.Err()
}
func allSegments(dir string) (io.ReadCloser, error) {
seg, err := listSegments(dir)
if err != nil {
return nil, err
}
var readers []io.Reader
var closers []io.Closer
for _, r := range seg {
f, err := os.Open(filepath.Join(dir, r.name))
if err != nil {
return nil, err
}
readers = append(readers, f)
closers = append(closers, f)
}
return io.MultiReader(readers...), nil
return &multiReadCloser{
reader: io.MultiReader(readers...),
closers: closers,
}, nil
}
func TestReaderFuzz(t *testing.T) {
@ -290,7 +313,9 @@ func TestReaderFuzz(t *testing.T) {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
@ -306,6 +331,7 @@ func TestReaderFuzz(t *testing.T) {
sr, err := allSegments(w.Dir())
testutil.Ok(t, err)
defer sr.Close()
reader := fn(sr)
for expected := range input {
@ -321,10 +347,13 @@ func TestReaderFuzz_Live(t *testing.T) {
logger := testutil.NewLogger(t)
dir, err := ioutil.TempDir("", "wal_fuzz_live")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
defer w.Close()
// In the background, generate a stream of random records and write them
// to the WAL.
@ -343,6 +372,7 @@ func TestReaderFuzz_Live(t *testing.T) {
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
defer seg.Close()
r := NewLiveReader(logger, seg)
segmentTicker := time.NewTicker(100 * time.Millisecond)
@ -379,6 +409,7 @@ outer:
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
testutil.Ok(t, err)
defer seg.Close()
r = NewLiveReader(logger, seg)
case <-readTicker.C:
@ -399,7 +430,9 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
logger := testutil.NewLogger(t)
dir, err := ioutil.TempDir("", "wal_live_corrupt")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := NewSize(nil, nil, dir, pageSize)
testutil.Ok(t, err)
@ -429,6 +462,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
defer seg.Close()
r := NewLiveReader(logger, seg)
testutil.Assert(t, r.Next() == false, "expected no records")
@ -440,7 +474,9 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
logger := testutil.NewLogger(t)
dir, err := ioutil.TempDir("", "wal_live_corrupt")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := NewSize(nil, nil, dir, pageSize*2)
testutil.Ok(t, err)
@ -474,6 +510,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
defer seg.Close()
r := NewLiveReader(logger, seg)
testutil.Assert(t, r.Next() == false, "expected no records")

View file

@ -109,7 +109,9 @@ func TestWAL_Repair(t *testing.T) {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_repair")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
// We create 3 segments with 3 records each and
// then corrupt a given record in a given segment.
@ -138,6 +140,7 @@ func TestWAL_Repair(t *testing.T) {
w, err = NewSize(nil, nil, dir, segSize)
testutil.Ok(t, err)
defer w.Close()
sr, err := NewSegmentsReader(dir)
testutil.Ok(t, err)
@ -151,6 +154,7 @@ func TestWAL_Repair(t *testing.T) {
testutil.Ok(t, w.Repair(r.Err()))
sr, err = NewSegmentsReader(dir)
testutil.Ok(t, err)
defer sr.Close()
r = NewReader(sr)
var result [][]byte
@ -181,7 +185,9 @@ func TestWAL_Repair(t *testing.T) {
func TestCorruptAndCarryOn(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_repair")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
var (
logger = testutil.NewLogger(t)
@ -295,13 +301,16 @@ func TestCorruptAndCarryOn(t *testing.T) {
testutil.Equals(t, 9, i, "wrong number of records")
testutil.Assert(t, !reader.Next(), "unexpected record")
testutil.Equals(t, nil, reader.Err())
sr.Close()
}
}
func BenchmarkWAL_LogBatched(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_logbatch")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
w, err := New(nil, nil, "testdir")
testutil.Ok(b, err)
@ -329,7 +338,9 @@ func BenchmarkWAL_LogBatched(b *testing.B) {
func BenchmarkWAL_Log(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_logsingle")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(b, os.RemoveAll(dir))
}()
w, err := New(nil, nil, "testdir")
testutil.Ok(b, err)

View file

@ -36,7 +36,9 @@ import (
func TestSegmentWAL_cut(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_wal_cut")
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
// This calls cut() implicitly the first time without a previous tail.
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
@ -84,7 +86,9 @@ func TestSegmentWAL_Truncate(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_log_truncate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := OpenSegmentWAL(dir, nil, 0, nil)
testutil.Ok(t, err)
@ -163,7 +167,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_log_restore")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
var (
recordedSeries [][]RefSeries
@ -270,7 +276,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
func TestWALRestoreCorrupted_invalidSegment(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_log_restore")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wal, err := OpenSegmentWAL(dir, nil, 0, nil)
testutil.Ok(t, err)
@ -367,7 +375,9 @@ func TestWALRestoreCorrupted(t *testing.T) {
// for the purpose of this test.
dir, err := ioutil.TempDir("", "test_corrupted")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := OpenSegmentWAL(dir, nil, 0, nil)
testutil.Ok(t, err)
@ -442,7 +452,9 @@ func TestMigrateWAL_Empty(t *testing.T) {
// which is valid in the new format.
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wdir := path.Join(dir, "wal")
@ -457,7 +469,9 @@ func TestMigrateWAL_Empty(t *testing.T) {
func TestMigrateWAL_Fuzz(t *testing.T) {
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wdir := path.Join(dir, "wal")