Merge pull request #332 from prometheus/newwal

wal: add write ahead log package
This commit is contained in:
Fabian Reinartz 2018-08-07 04:14:15 -05:00 committed by GitHub
commit 2a0e96eb19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 2732 additions and 154 deletions

279
checkpoint.go Normal file
View file

@ -0,0 +1,279 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)
// CheckpointStats returns stats about a created checkpoint.
type CheckpointStats struct {
DroppedSeries int
DroppedSamples int
DroppedTombstones int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
}
// LastCheckpoint returns the directory name of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
// Traverse list backwards since there may be multiple checkpoints left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
return fi.Name(), k, nil
}
return "", 0, ErrNotFound
}
// DeleteCheckpoints deletes all checkpoints in dir that have an index
// below n.
func DeleteCheckpoints(dir string, n int) error {
var errs MultiError
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || k >= n {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
const checkpointPrefix = "checkpoint."
// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
stats := &CheckpointStats{}
var sr io.Reader
{
lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
}
if err == nil {
if m > k+1 {
return nil, errors.New("unexpected gap to last checkpoint")
}
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
m = k + 1
last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn))
if err != nil {
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
sr = last
}
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
}
defer segsr.Close()
if sr != nil {
sr = io.MultiReader(sr, segsr)
} else {
sr = segsr
}
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n))
cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
r := wal.NewReader(sr)
var (
series []RefSeries
samples []RefSample
tstones []Stone
dec RecordDecoder
enc RecordEncoder
buf []byte
recs [][]byte
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
// Remember where the record for this iteration starts.
start := len(buf)
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
if err != nil {
return nil, errors.Wrap(err, "decode series")
}
// Drop irrelevant series in place.
repl := series[:0]
for _, s := range series {
if keep(s.Ref) {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Series(repl, buf)
}
stats.TotalSeries += len(series)
stats.DroppedSeries += len(series) - len(repl)
case RecordSamples:
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, errors.Wrap(err, "decode samples")
}
// Drop irrelevant samples in place.
repl := samples[:0]
for _, s := range samples {
if s.T >= mint {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Samples(repl, buf)
}
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)
case RecordTombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return nil, errors.Wrap(err, "decode deletes")
}
// Drop irrelevant tombstones in place.
repl := tstones[:0]
for _, s := range tstones {
for _, iv := range s.intervals {
if iv.Maxt >= mint {
repl = append(repl, s)
break
}
}
}
if len(repl) > 0 {
buf = enc.Tombstones(repl, buf)
}
stats.TotalTombstones += len(tstones)
stats.DroppedTombstones += len(tstones) - len(repl)
default:
return nil, errors.New("invalid record type")
}
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 1 MB increments.
if len(buf) > 1*1024*1024 {
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
// If we hit any corruption during checkpointing, repairing is not an option.
// The head won't know which series records are lost.
if r.Err() != nil {
return nil, errors.Wrap(r.Err(), "read segments")
}
// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
}
return stats, nil
}

180
checkpoint_test.go Normal file
View file

@ -0,0 +1,180 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)
func TestLastCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
s, k, err := LastCheckpoint(dir)
testutil.Equals(t, ErrNotFound, err)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.0000", s)
testutil.Equals(t, 0, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.0000", s)
testutil.Equals(t, 0, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.1", s)
testutil.Equals(t, 1, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.1000", s)
testutil.Equals(t, 1000, k)
}
func TestDeleteCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
testutil.Ok(t, DeleteCheckpoints(dir, 0))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.01"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.02"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.03"), 0777))
testutil.Ok(t, DeleteCheckpoints(dir, 2))
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files)
}
func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())
// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
testutil.Ok(t, err)
// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))
last += 100
}
testutil.Ok(t, w.Close())
_, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
defer sr.Close()
var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
}
}
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

16
db.go
View file

@ -37,6 +37,7 @@ import (
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
"golang.org/x/sync/errgroup"
)
@ -191,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err
}
// Migrate old WAL.
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}
db = &DB{
dir: dir,
@ -221,18 +226,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
return nil, errors.Wrap(err, "create leveled compactor")
}
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
wlog, err := wal.New(l, r, filepath.Join(dir, "wal"))
if err != nil {
return nil, err
}
db.head, err = NewHead(r, l, wal, opts.BlockRanges[0])
db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0])
if err != nil {
return nil, err
}
if err := db.reload(); err != nil {
return nil, err
}
if err := db.head.ReadWAL(); err != nil {
if err := db.head.Init(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
@ -501,7 +506,6 @@ func (db *DB) reload() (err error) {
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
@ -596,10 +600,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps {
if len(bm) <= 1 {
return nil
}
sort.Slice(bm, func(i, j int) bool {
return bm[i].MinTime < bm[j].MinTime
})
var (
overlaps [][]BlockMeta

View file

@ -19,6 +19,7 @@ import (
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"testing"
@ -30,6 +31,7 @@ import (
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)
func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
@ -1025,33 +1027,41 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps")
// Add overlapping blocks.
// Add overlapping blocks. We've to establish order again since we aren't interested
// in trivial overlaps caused by unorderedness.
add := func(ms ...BlockMeta) []BlockMeta {
repl := append(append([]BlockMeta{}, metas...), ms...)
sort.Slice(repl, func(i, j int) bool {
return repl[i].MinTime < repl[j].MinTime
})
return repl
}
// o1 overlaps with 10-20.
o1 := BlockMeta{MinTime: 15, MaxTime: 17}
testutil.Equals(t, Overlaps{
{Min: 15, Max: 17}: {metas[1], o1},
}, OverlappingBlocks(append(metas, o1)))
}, OverlappingBlocks(add(o1)))
// o2 overlaps with 20-30 and 30-40.
o2 := BlockMeta{MinTime: 21, MaxTime: 31}
testutil.Equals(t, Overlaps{
{Min: 21, Max: 30}: {metas[2], o2},
{Min: 30, Max: 31}: {o2, metas[3]},
}, OverlappingBlocks(append(metas, o2)))
}, OverlappingBlocks(add(o2)))
// o3a and o3b overlaps with 30-40 and each other.
o3a := BlockMeta{MinTime: 33, MaxTime: 39}
o3b := BlockMeta{MinTime: 34, MaxTime: 36}
testutil.Equals(t, Overlaps{
{Min: 34, Max: 36}: {metas[3], o3a, o3b},
}, OverlappingBlocks(append(metas, o3a, o3b)))
}, OverlappingBlocks(add(o3a, o3b)))
// o4 is 1:1 overlap with 50-60.
o4 := BlockMeta{MinTime: 50, MaxTime: 60}
testutil.Equals(t, Overlaps{
{Min: 50, Max: 60}: {metas[5], o4},
}, OverlappingBlocks(append(metas, o4)))
}, OverlappingBlocks(add(o4)))
// o5 overlaps with 60-70, 70-80 and 80-90.
o5 := BlockMeta{MinTime: 61, MaxTime: 85}
@ -1059,7 +1069,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
{Min: 61, Max: 70}: {metas[6], o5},
{Min: 70, Max: 80}: {o5, metas[7]},
{Min: 80, Max: 85}: {o5, metas[8]},
}, OverlappingBlocks(append(metas, o5)))
}, OverlappingBlocks(add(o5)))
// o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a.
o6a := BlockMeta{MinTime: 92, MaxTime: 105}
@ -1067,7 +1077,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
testutil.Equals(t, Overlaps{
{Min: 94, Max: 99}: {metas[9], o6a, o6b},
{Min: 100, Max: 105}: {o6a, metas[10]},
}, OverlappingBlocks(append(metas, o6a, o6b)))
}, OverlappingBlocks(add(o6a, o6b)))
// All together.
testutil.Equals(t, Overlaps{
@ -1077,7 +1087,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
{Min: 50, Max: 60}: {metas[5], o4},
{Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]},
{Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]},
}, OverlappingBlocks(append(metas, o1, o2, o3a, o3b, o4, o5, o6a, o6b)))
}, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b)))
// Additional case.
var nc1 []BlockMeta
@ -1185,3 +1195,109 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}
func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("clean", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
// Should be set to init values if no WAL or blocks exist so far.
testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime())
testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
// First added sample initializes the writable range.
app := db.Appender()
_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
testutil.Ok(t, err)
testutil.Equals(t, int64(1000), db.head.MinTime())
testutil.Equals(t, int64(1000), db.head.MaxTime())
})
t.Run("wal-only", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
var enc RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(5000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
})
t.Run("existing-block", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
id := ulid.MustNew(2000, nil)
createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{
ULID: id,
MinTime: 1000,
MaxTime: 2000,
})
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(2000), db.head.MinTime())
testutil.Equals(t, int64(2000), db.head.MaxTime())
})
t.Run("existing-block-and-wal", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
id := ulid.MustNew(2000, nil)
createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{
ULID: id,
MinTime: 1000,
MaxTime: 6000,
})
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
var enc RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(6000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
})
}

86
docs/format/wal.md Normal file
View file

@ -0,0 +1,86 @@
# WAL Disk Format
The write ahead log operates in segments that are numbered and sequential,
e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default.
A segment is written to in pages of 32KB. Only the last page of the most recent segment
may be partial. A WAL record is an opaque byte slice that gets split up into sub-records
should it exceed the remaining space of the current page. Records are never split across
segment boundaries. If a single record exceeds the default segment size, a segment with
a larger size will be created.
The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.](https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format)
Notable deviations are that the record fragment is encoded as:
```
┌───────────┬──────────┬────────────┬──────────────┐
│ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes>
└───────────┴──────────┴────────────┴──────────────┘
```
The type flag has the following states:
* `0`: rest of page will be empty
* `1`: a full record encoded in a single fragment
* `2`: first fragment of a record
* `3`: middle fragment of a record
* `4`: final fragment of a record
## Record encoding
The records written to the write ahead log are encoded as follows:
### Series records
Series records encode the labels that identifies a series and its unique ID.
```
┌────────────────────────────────────────────┐
│ type = 1 <1b>
├────────────────────────────────────────────┤
│ ┌─────────┬──────────────────────────────┐ │
│ │ id <8b> │ n = len(labels) <uvarint> │ │
│ ├─────────┴────────────┬─────────────────┤ │
│ │ len(str_1) <uvarint> │ str_1 <bytes> │ │
│ ├──────────────────────┴─────────────────┤ │
│ │ ... │ │
│ ├───────────────────────┬────────────────┤ │
│ │ len(str_2n) <uvarint> │ str_2n <bytes> │ │
│ └───────────────────────┴────────────────┘ │
│ . . . │
└────────────────────────────────────────────┘
```
### Sample records
Sample records encode samples as a list of triples `(series_id, timestamp, value)`.
Series reference and timestamp are encoded as deltas w.r.t the first sample.
```
┌──────────────────────────────────────────────────────────────────┐
│ type = 2 <1b>
├──────────────────────────────────────────────────────────────────┤
│ ┌────────────────────┬───────────────────────────┬─────────────┐ │
│ │ id <8b> │ timestamp <8b> │ value <8b> │ │
│ └────────────────────┴───────────────────────────┴─────────────┘ │
│ ┌────────────────────┬───────────────────────────┬─────────────┐ │
│ │ id_delta <uvarint> │ timestamp_delta <uvarint> │ value <8b> │ │
│ └────────────────────┴───────────────────────────┴─────────────┘ │
│ . . . │
└──────────────────────────────────────────────────────────────────┘
```
### Tombstone records
Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)`
and specify an interval for which samples of a series got deleted.
```
┌─────────────────────────────────────────────────────┐
│ type = 3 <1b>
├─────────────────────────────────────────────────────┤
│ ┌─────────┬───────────────────┬───────────────────┐ │
│ │ id <8b> │ min_time <varint> │ max_time <varint> │ │
│ └─────────┴───────────────────┴───────────────────┘ │
│ . . . │
└─────────────────────────────────────────────────────┘
```

View file

@ -6,6 +6,7 @@ package fileutil
import (
"os"
"path/filepath"
"sort"
)
@ -23,3 +24,45 @@ func ReadDir(dirpath string) ([]string, error) {
sort.Strings(names)
return names, nil
}
// Rename safely renames a file.
func Rename(from, to string) error {
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = Fsync(pdir); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}
// Replace moves a file or directory to a new location and deletes any previous data.
// It is not atomic.
func Replace(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return nil
}
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = Fsync(pdir); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}

345
head.go
View file

@ -15,6 +15,7 @@ package tsdb
import (
"math"
"path/filepath"
"runtime"
"sort"
"strings"
@ -30,6 +31,7 @@ import (
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)
var (
@ -53,9 +55,10 @@ var (
type Head struct {
chunkRange int64
metrics *headMetrics
wal WAL
wal *wal.WAL
logger log.Logger
appendPool sync.Pool
bytesPool sync.Pool
minTime, maxTime int64
lastSeriesID uint64
@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}
// NewHead opens the head block in dir.
func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) {
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
if wal == nil {
wal = NopWAL()
}
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
}
@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
wal: wal,
logger: l,
chunkRange: chunkRange,
minTime: math.MinInt64,
minTime: math.MaxInt64,
maxTime: math.MinInt64,
series: newStripeSeries(),
values: map[string]stringset{},
@ -200,15 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
mint int64,
minValidTime int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) {
defer close(output)
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input {
for _, s := range samples {
if s.T < mint || s.Ref%total != partition {
if s.T < minValidTime || s.Ref%total != partition {
continue
}
ms := h.series.getByID(s.Ref)
@ -221,18 +223,48 @@ func (h *Head) processWALSamples(
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if s.T > maxt {
maxt = s.T
}
if s.T < mint {
mint = s.T
}
}
output <- samples
}
h.updateMinMaxTime(mint, maxt)
return unknownRefs
}
// ReadWAL initializes the head by consuming the write ahead log.
func (h *Head) ReadWAL() error {
defer h.postings.EnsureOrder()
func (h *Head) updateMinMaxTime(mint, maxt int64) {
for {
lt := h.MinTime()
if mint >= lt {
break
}
if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
break
}
}
for {
ht := h.MaxTime()
if maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) {
break
}
}
}
r := h.wal.Reader()
mint := h.MinTime()
func (h *Head) loadWAL(r *wal.Reader) error {
minValidTime := h.MinTime()
// If the min time is still uninitialized (no persisted blocks yet),
// we accept all sample timestamps from the WAL.
if minValidTime == math.MaxInt64 {
minValidTime = math.MinInt64
}
// Track number of samples that referenced a series we don't know about
// for error reporting.
@ -253,7 +285,7 @@ func (h *Head) ReadWAL() error {
output := make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(i, input, output)
@ -263,49 +295,71 @@ func (h *Head) ReadWAL() error {
input = output
}
// TODO(fabxc): series entries spread between samples can starve the sample workers.
// Even with bufferd channels, this can impact startup time with lots of series churn.
// We must not paralellize series creation itself but could make the indexing asynchronous.
seriesFunc := func(series []RefSeries) {
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
var (
dec RecordDecoder
series []RefSeries
samples []RefSample
tstones []Stone
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record()
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
switch dec.Type(rec) {
case RecordSeries:
series, err := dec.Series(rec, series)
if err != nil {
return errors.Wrap(err, "decode series")
}
}
}
samplesFunc := func(samples []RefSample) {
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
}
var buf []RefSample
select {
case buf = <-input:
default:
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
}
}
deletesFunc := func(stones []Stone) {
for _, s := range stones {
for _, itv := range s.intervals {
if itv.Maxt < mint {
continue
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
}
h.tombstones.addInterval(s.ref, itv)
}
case RecordSamples:
samples, err := dec.Samples(rec, samples)
if err != nil {
return errors.Wrap(err, "decode samples")
}
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
}
var buf []RefSample
select {
case buf = <-input:
default:
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
}
case RecordTombstones:
tstones, err := dec.Tombstones(rec, tstones)
if err != nil {
return errors.Wrap(err, "decode tombstones")
}
for _, s := range tstones {
for _, itv := range s.intervals {
if itv.Maxt < minValidTime {
continue
}
h.tombstones.addInterval(s.ref, itv)
}
}
default:
return errors.Errorf("invalid record type %v", dec.Type(rec))
}
}
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
// Signal termination to first worker and wait for last one to close its output channel.
close(firstInput)
@ -313,20 +367,64 @@ func (h *Head) ReadWAL() error {
}
wg.Wait()
if err != nil {
return errors.Wrap(err, "consume WAL")
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
}
return nil
}
// Truncate removes all data before mint from the head block and truncates its WAL.
func (h *Head) Truncate(mint int64) error {
initialize := h.MinTime() == math.MinInt64
// Init loads data from the write ahead log and prepares the head for writes.
func (h *Head) Init() error {
defer h.postings.EnsureOrder()
if h.MinTime() >= mint {
if h.wal == nil {
return nil
}
// Backfill the checkpoint first if it exists.
cp, n, err := LastCheckpoint(h.wal.Dir())
if err != nil && err != ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
if err == nil {
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp))
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer sr.Close()
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
n++
}
// Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1)
if err != nil {
return errors.Wrap(err, "open WAL segments")
}
defer sr.Close()
err = h.loadWAL(wal.NewReader(sr))
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}
return nil
}
// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error {
initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint && !initialize {
return nil
}
atomic.StoreInt64(&h.minTime, mint)
@ -348,18 +446,37 @@ func (h *Head) Truncate(mint int64) error {
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
if h.wal == nil {
return nil
}
start = time.Now()
m, n, err := h.wal.Segments()
if err != nil {
return errors.Wrap(err, "get segment range")
}
n-- // Never consider last segment for checkpoint.
if n < 0 {
return nil // no segments yet.
}
// The lower third of segments should contain mostly obsolete samples.
// If we have less than three segments, it's not worth checkpointing yet.
n = m + (n-m)/3
if n <= m {
return nil
}
keep := func(id uint64) bool {
return h.series.getByID(id) != nil
}
if err := h.wal.Truncate(mint, keep); err == nil {
level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start))
} else {
level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil {
return errors.Wrap(err, "create checkpoint")
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
"low", m, "high", n, "duration", time.Since(start))
return nil
}
@ -367,10 +484,7 @@ func (h *Head) Truncate(mint int64) error {
// for a compltely fresh head with an empty WAL.
// Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) {
// In the init state, the head has a high timestamp of math.MinInt64.
mint, _ := rangeForTimestamp(t, h.chunkRange)
if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) {
if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
return false
}
// Ensure that max time is initialized to at least the min time we just set.
@ -441,7 +555,7 @@ func (h *Head) Appender() Appender {
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MinInt64 {
if h.MinTime() == math.MaxInt64 {
return &initAppender{head: h}
}
return h.appender()
@ -449,10 +563,11 @@ func (h *Head) Appender() Appender {
func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
mint: h.MaxTime() - h.chunkRange/2,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
head: h,
minValidTime: h.MaxTime() - h.chunkRange/2,
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}
@ -468,16 +583,29 @@ func (h *Head) putAppendBuffer(b []RefSample) {
h.appendPool.Put(b[:0])
}
func (h *Head) getBytesBuffer() []byte {
b := h.bytesPool.Get()
if b == nil {
return make([]byte, 0, 1024)
}
return b.([]byte)
}
func (h *Head) putBytesBuffer(b []byte) {
h.bytesPool.Put(b[:0])
}
type headAppender struct {
head *Head
mint, maxt int64
head *Head
minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
series []RefSeries
samples []RefSample
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.mint {
if t < a.minValidTime {
return 0, ErrOutOfBounds
}
@ -504,9 +632,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if err != nil {
return err
}
if t < a.mint {
if t < a.minValidTime {
return ErrOutOfBounds
}
if t < a.mint {
a.mint = t
}
if t > a.maxt {
a.maxt = t
}
@ -520,15 +651,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
return nil
}
func (a *headAppender) log() error {
if a.head.wal == nil {
return nil
}
buf := a.head.getBytesBuffer()
defer func() { a.head.putBytesBuffer(buf) }()
var rec []byte
var enc RecordEncoder
if len(a.series) > 0 {
rec = enc.Series(a.series, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log series")
}
}
if len(a.samples) > 0 {
rec = enc.Samples(a.samples, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log samples")
}
}
return nil
}
func (a *headAppender) Commit() error {
defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
if err := a.head.wal.LogSeries(a.series); err != nil {
return err
}
if err := a.head.wal.LogSamples(a.samples); err != nil {
return errors.Wrap(err, "WAL log samples")
if err := a.log(); err != nil {
return errors.Wrap(err, "write to WAL")
}
total := len(a.samples)
@ -548,16 +706,7 @@ func (a *headAppender) Commit() error {
}
a.head.metrics.samplesAppended.Add(float64(total))
for {
ht := a.head.MaxTime()
if a.maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) {
break
}
}
a.head.updateMinMaxTime(a.mint, a.maxt)
return nil
}
@ -568,7 +717,8 @@ func (a *headAppender) Rollback() error {
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a.head.wal.LogSeries(a.series)
a.samples = nil
return a.log()
}
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
@ -601,8 +751,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
if p.Err() != nil {
return p.Err()
}
if err := h.wal.LogDeletes(stones); err != nil {
return err
var enc RecordEncoder
if h.wal != nil {
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err
}
}
for _, s := range stones {
h.tombstones.addInterval(s.ref, s.intervals[0])
@ -694,6 +848,9 @@ func (h *Head) MaxTime() int64 {
// Close flushes the WAL and closes the head.
func (h *Head) Close() error {
if h.wal == nil {
return nil
}
return h.wal.Close()
}

View file

@ -14,7 +14,9 @@
package tsdb
import (
"io/ioutil"
"math/rand"
"os"
"testing"
"github.com/prometheus/tsdb/chunkenc"
@ -22,6 +24,7 @@ import (
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)
func BenchmarkCreateSeries(b *testing.B) {
@ -42,42 +45,50 @@ func BenchmarkCreateSeries(b *testing.B) {
}
}
type memoryWAL struct {
nopWAL
entries []interface{}
}
func (w *memoryWAL) LogSeries(s []RefSeries) error {
w.entries = append(w.entries, s)
return nil
}
func (w *memoryWAL) LogSamples(s []RefSample) error {
w.entries = append(w.entries, s)
return nil
}
func (w *memoryWAL) LogDeletes(s []Stone) error {
w.entries = append(w.entries, s)
return nil
}
func (w *memoryWAL) Reader() WALReader {
return w
}
func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error {
for _, e := range w.entries {
switch v := e.(type) {
func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
var enc RecordEncoder
for _, r := range recs {
switch v := r.(type) {
case []RefSeries:
series(v)
testutil.Ok(t, w.Log(enc.Series(v, nil)))
case []RefSample:
samples(v)
testutil.Ok(t, w.Log(enc.Samples(v, nil)))
case []Stone:
deletes(v)
testutil.Ok(t, w.Log(enc.Tombstones(v, nil)))
}
}
return nil
}
func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
sr, err := wal.NewSegmentsReader(dir)
testutil.Ok(t, err)
defer sr.Close()
var dec RecordDecoder
r := wal.NewReader(sr)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err := dec.Series(rec, nil)
testutil.Ok(t, err)
recs = append(recs, series)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
recs = append(recs, samples)
case RecordTombstones:
tstones, err := dec.Tombstones(rec, nil)
testutil.Ok(t, err)
recs = append(recs, tstones)
default:
t.Fatalf("unknown record type")
}
}
testutil.Ok(t, r.Err())
return recs
}
func TestHead_ReadWAL(t *testing.T) {
@ -100,13 +111,19 @@ func TestHead_ReadWAL(t *testing.T) {
{Ref: 50, T: 101, V: 6},
},
}
wal := &memoryWAL{entries: entries}
dir, err := ioutil.TempDir("", "test_read_wal")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
head, err := NewHead(nil, nil, wal, 1000)
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
populateTestWAL(t, w, entries)
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
defer head.Close()
testutil.Ok(t, head.ReadWAL())
testutil.Ok(t, head.Init())
testutil.Equals(t, uint64(100), head.lastSeriesID)
s10 := head.series.getByID(10)
@ -259,13 +276,19 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
{Ref: 50, T: 90, V: 1},
},
}
wal := &memoryWAL{entries: entries}
dir, err := ioutil.TempDir("", "test_delete_series")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
head, err := NewHead(nil, nil, wal, 1000)
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
populateTestWAL(t, w, entries)
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
defer head.Close()
testutil.Ok(t, head.ReadWAL())
testutil.Ok(t, head.Init())
testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
}
@ -705,7 +728,7 @@ func TestMemSeries_append(t *testing.T) {
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
h, err := NewHead(nil, nil, NopWAL(), 1000)
h, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(t, err)
defer h.Close()
@ -745,7 +768,7 @@ func TestGCChunkAccess(t *testing.T) {
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, NopWAL(), 1000)
h, err := NewHead(nil, nil, nil, 1000)
testutil.Ok(t, err)
defer h.Close()
@ -786,7 +809,12 @@ func TestGCSeriesAccess(t *testing.T) {
}
func TestHead_LogRollback(t *testing.T) {
w := &memoryWAL{}
dir, err := ioutil.TempDir("", "wal_rollback")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.New(nil, nil, dir)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
@ -795,9 +823,11 @@ func TestHead_LogRollback(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, 1, len(w.entries))
recs := readTestWAL(t, w.Dir())
series, ok := w.entries[0].([]RefSeries)
testutil.Assert(t, ok, "expected series record but got %+v", w.entries[0])
testutil.Equals(t, 1, len(recs))
series, ok := recs[0].([]RefSeries)
testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}})
}

213
record.go Normal file
View file

@ -0,0 +1,213 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"math"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
)
// RecordType represents the data type of a record.
type RecordType uint8
const (
RecordInvalid RecordType = 255
RecordSeries RecordType = 1
RecordSamples RecordType = 2
RecordTombstones RecordType = 3
)
type RecordLogger interface {
Log(recs ...[]byte) error
}
type RecordReader interface {
Next() bool
Err() error
Record() []byte
}
// RecordDecoder decodes series, sample, and tombstone records.
// The zero value is ready to use.
type RecordDecoder struct {
}
// Type returns the type of the record.
// Return RecordInvalid if no valid record type is found.
func (d *RecordDecoder) Type(rec []byte) RecordType {
if len(rec) < 1 {
return RecordInvalid
}
switch t := RecordType(rec[0]); t {
case RecordSeries, RecordSamples, RecordTombstones:
return t
}
return RecordInvalid
}
// Series appends series in rec to the given slice.
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordSeries {
return nil, errors.New("invalid record type")
}
for len(dec.b) > 0 && dec.err() == nil {
ref := dec.be64()
lset := make(labels.Labels, dec.uvarint())
for i := range lset {
lset[i].Name = dec.uvarintStr()
lset[i].Value = dec.uvarintStr()
}
sort.Sort(lset)
series = append(series, RefSeries{
Ref: ref,
Labels: lset,
})
}
if dec.err() != nil {
return nil, dec.err()
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return series, nil
}
// Samples appends samples in rec to the given slice.
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordSamples {
return nil, errors.New("invalid record type")
}
if dec.len() == 0 {
return samples, nil
}
var (
baseRef = dec.be64()
baseTime = dec.be64int64()
)
for len(dec.b) > 0 && dec.err() == nil {
dref := dec.varint64()
dtime := dec.varint64()
val := dec.be64()
samples = append(samples, RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
})
}
if dec.err() != nil {
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples))
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return samples, nil
}
// Tombstones appends tombstones in rec to the given slice.
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordTombstones {
return nil, errors.New("invalid record type")
}
for dec.len() > 0 && dec.err() == nil {
tstones = append(tstones, Stone{
ref: dec.be64(),
intervals: Intervals{
{Mint: dec.varint64(), Maxt: dec.varint64()},
},
})
}
if dec.err() != nil {
return nil, dec.err()
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return tstones, nil
}
// RecordEncoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type RecordEncoder struct {
}
// Series appends the encoded series to b and returns the resulting slice.
func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordSeries))
for _, s := range series {
buf.putBE64(s.Ref)
buf.putUvarint(len(s.Labels))
for _, l := range s.Labels {
buf.putUvarintStr(l.Name)
buf.putUvarintStr(l.Value)
}
}
return buf.get()
}
// Samples appends the encoded samples to b and returns the resulting slice.
func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordSamples))
if len(samples) == 0 {
return buf.get()
}
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
first := samples[0]
buf.putBE64(first.Ref)
buf.putBE64int64(first.T)
for _, s := range samples {
buf.putVarint64(int64(s.Ref) - int64(first.Ref))
buf.putVarint64(s.T - first.T)
buf.putBE64(math.Float64bits(s.V))
}
return buf.get()
}
// Tombstones appends the encoded tombstones to b and returns the resulting slice.
func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordTombstones))
for _, s := range tstones {
for _, iv := range s.intervals {
buf.putBE64(s.ref)
buf.putVarint64(iv.Mint)
buf.putVarint64(iv.Maxt)
}
}
return buf.get()
}

73
record_test.go Normal file
View file

@ -0,0 +1,73 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"testing"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
func TestRecord_EncodeDecode(t *testing.T) {
var enc RecordEncoder
var dec RecordDecoder
series := []RefSeries{
{
Ref: 100,
Labels: labels.FromStrings("abc", "def", "123", "456"),
}, {
Ref: 1,
Labels: labels.FromStrings("abc", "def2", "1234", "4567"),
}, {
Ref: 435245,
Labels: labels.FromStrings("xyz", "def", "foo", "bar"),
},
}
decSeries, err := dec.Series(enc.Series(series, nil), nil)
testutil.Ok(t, err)
testutil.Equals(t, series, decSeries)
samples := []RefSample{
{Ref: 0, T: 12423423, V: 1.2345},
{Ref: 123, T: -1231, V: -123},
{Ref: 2, T: 0, V: 99999},
}
decSamples, err := dec.Samples(enc.Samples(samples, nil), nil)
testutil.Ok(t, err)
testutil.Equals(t, samples, decSamples)
// Intervals get split up into single entries. So we don't get back exactly
// what we put in.
tstones := []Stone{
{ref: 123, intervals: Intervals{
{Mint: -1000, Maxt: 1231231},
{Mint: 5000, Maxt: 0},
}},
{ref: 13, intervals: Intervals{
{Mint: -1000, Maxt: -11},
{Mint: 5000, Maxt: 1000},
}},
}
decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil)
testutil.Ok(t, err)
testutil.Equals(t, []Stone{
{ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}},
{ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}},
{ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}},
{ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}},
}, decTstones)
}

View file

@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
}
// On DB opening all blocks in the base dir should be repaired.
db, _ := Open("testdata/repair_index_version", nil, nil, nil)
db, err := Open("testdata/repair_index_version", nil, nil, nil)
if err != nil {
t.Fatal(err)
}

102
wal.go
View file

@ -33,6 +33,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)
// WALEntryType indicates what data a WAL entry contains.
@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
//
// DEPRECATED: use wal pkg combined with the record codex instead.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
}
// SegmentWAL is a write ahead log for series data.
//
// DEPRECATED: use wal pkg combined with the record coders instead.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
}
return nil
}
// MigrateWAL rewrites the deprecated write ahead log into the new format.
func MigrateWAL(logger log.Logger, dir string) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
// Detect whether we still have the old WAL.
fns, err := sequenceFiles(dir)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "list sequence files")
}
if len(fns) == 0 {
return nil // No WAL at all yet.
}
// Check header of first segment to see whether we are still dealing with an
// old WAL.
f, err := os.Open(fns[0])
if err != nil {
return errors.Wrap(err, "check first existing segment")
}
defer f.Close()
var hdr [4]byte
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
return errors.Wrap(err, "read header from first segment")
}
// If we cannot read the magic header for segments of the old WAL, abort.
// Either it's migrated already or there's a corruption issue with which
// we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case.
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
return nil
}
level.Info(logger).Log("msg", "migrating WAL format")
tmpdir := dir + ".tmp"
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
}
repl, err := wal.New(logger, nil, tmpdir)
if err != nil {
return errors.Wrap(err, "open new WAL")
}
// It should've already been closed as part of the previous finalization.
// Do it once again in case of prior errors.
defer func() {
if err != nil {
repl.Close()
}
}()
w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
if err != nil {
return errors.Wrap(err, "open old WAL")
}
defer w.Close()
rdr := w.Reader()
var (
enc RecordEncoder
b []byte
)
decErr := rdr.Read(
func(s []RefSeries) {
if err != nil {
return
}
err = repl.Log(enc.Series(s, b[:0]))
},
func(s []RefSample) {
if err != nil {
return
}
err = repl.Log(enc.Samples(s, b[:0]))
},
func(s []Stone) {
if err != nil {
return
}
err = repl.Log(enc.Tombstones(s, b[:0]))
},
)
if decErr != nil {
return errors.Wrap(err, "decode old entries")
}
if err != nil {
return errors.Wrap(err, "write new entries")
}
if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL")
}
if err := fileutil.Replace(tmpdir, dir); err != nil {
return errors.Wrap(err, "replace old WAL")
}
return nil
}

822
wal/wal.go Normal file
View file

@ -0,0 +1,822 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
)
const (
defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
pageSize = 32 * 1024 // 32KB
recordHeaderSize = 7
)
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
type page struct {
alloc int
flushed int
buf [pageSize]byte
}
func (p *page) remaining() int {
return pageSize - p.alloc
}
func (p *page) full() bool {
return pageSize-p.alloc < recordHeaderSize
}
// Segment represents a segment file.
type Segment struct {
*os.File
dir string
i int
}
// Index returns the index of the segment.
func (s *Segment) Index() int {
return s.i
}
// Dir returns the directory of the segment.
func (s *Segment) Dir() string {
return s.dir
}
// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
Segment int
Offset int
Err error
}
func (e *CorruptionErr) Error() string {
if e.Segment < 0 {
return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err)
}
return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err)
}
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
func OpenWriteSegment(dir string, k int) (*Segment, error) {
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
stat, err := f.Stat()
if err != nil {
f.Close()
return nil, err
}
// If the last page is torn, fill it with zeros.
// In case it was torn after all records were written successfully, this
// will just pad the page and everything will be fine.
// If it was torn mid-record, a full read (which the caller should do anyway
// to ensure integrity) will detect it as a corruption by the end.
if d := stat.Size() % pageSize; d != 0 {
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
f.Close()
return nil, errors.Wrap(err, "zero-pad torn page")
}
}
return &Segment{File: f, i: k, dir: dir}, nil
}
// CreateSegment creates a new segment k in dir.
func CreateSegment(dir string, k int) (*Segment, error) {
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
return &Segment{File: f, i: k, dir: dir}, nil
}
// OpenReadSegment opens the segment with the given filename.
func OpenReadSegment(fn string) (*Segment, error) {
k, err := strconv.Atoi(filepath.Base(fn))
if err != nil {
return nil, errors.New("not a valid filename")
}
f, err := os.Open(fn)
if err != nil {
return nil, err
}
return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil
}
// WAL is a write ahead log that stores records in segment files.
// It must be read from start to end once before logging new data.
// If an error occurs during read, the repair procedure must be called
// before it's safe to do further writes.
//
// Segments are written to in pages of 32KB, with records possibly split
// across page boundaries.
// Records are never split across segments to allow full segments to be
// safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment.
type WAL struct {
dir string
logger log.Logger
segmentSize int
mtx sync.RWMutex
segment *Segment // active segment
donePages int // pages written to the segment
page *page // active page
stopc chan chan struct{}
actorc chan func()
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
}
// New returns a new WAL over the given directory.
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
return NewSize(logger, reg, dir, defaultSegmentSize)
}
// NewSize returns a new WAL over the given directory.
// New segments are created with the specified size.
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) {
if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
}
if logger == nil {
logger = log.NewNopLogger()
}
w := &WAL{
dir: dir,
logger: logger,
segmentSize: segmentSize,
page: &page{},
actorc: make(chan func(), 100),
stopc: make(chan chan struct{}),
}
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
})
w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_page_flushes_total",
Help: "Total number of page flushes.",
})
w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_completed_pages_total",
Help: "Total number of completed pages.",
})
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions)
}
_, j, err := w.Segments()
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// Fresh dir, no segments yet.
if j == -1 {
if w.segment, err = CreateSegment(w.dir, 0); err != nil {
return nil, err
}
} else {
if w.segment, err = OpenWriteSegment(w.dir, j); err != nil {
return nil, err
}
// Correctly initialize donePages.
stat, err := w.segment.Stat()
if err != nil {
return nil, err
}
w.donePages = int(stat.Size() / pageSize)
}
go w.run()
return w, nil
}
// Dir returns the directory of the WAL.
func (w *WAL) Dir() string {
return w.dir
}
func (w *WAL) run() {
Loop:
for {
select {
case f := <-w.actorc:
f()
case donec := <-w.stopc:
close(w.actorc)
defer close(donec)
break Loop
}
}
// Drain and process any remaining functions.
for f := range w.actorc {
f()
}
}
// Repair attempts to repair the WAL based on the error.
// It discards all data after the corruption.
func (w *WAL) Repair(err error) error {
// We could probably have a mode that only discards torn records right around
// the corruption to preserve as data much as possible.
// But that's not generally applicable if the records have any kind of causality.
// Maybe as an extra mode in the future if mid-WAL corruptions become
// a frequent concern.
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.New("cannot handle error")
}
if cerr.Segment < 0 {
return errors.New("corruption error does not specify position")
}
level.Warn(w.logger).Log("msg", "starting corruption repair",
"segment", cerr.Segment, "offset", cerr.Offset)
// All segments behind the corruption can no longer be used.
segs, err := listSegments(w.dir)
if err != nil {
return errors.Wrap(err, "list segments")
}
level.Warn(w.logger).Log("msg", "deleting all segments behind corruption")
for _, s := range segs {
if s.n <= cerr.Segment {
continue
}
if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil {
return errors.Wrap(err, "delete segment")
}
}
// Regardless of the corruption offset, no record reaches into the previous segment.
// So we can safely repair the WAL by removing the segment and re-inserting all
// its records up to the corruption.
level.Warn(w.logger).Log("msg", "rewrite corrupted segment")
fn := SegmentName(w.dir, cerr.Segment)
tmpfn := fn + ".repair"
if err := fileutil.Rename(fn, tmpfn); err != nil {
return err
}
// Create a clean segment and make it the active one.
s, err := CreateSegment(w.dir, cerr.Segment)
if err != nil {
return err
}
w.segment = s
f, err := os.Open(tmpfn)
if err != nil {
return errors.Wrap(err, "open segment")
}
defer f.Close()
r := NewReader(bufio.NewReader(f))
for r.Next() {
if err := w.Log(r.Record()); err != nil {
return errors.Wrap(err, "insert record")
}
}
// We expect an error here, so nothing to handle.
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
}
return nil
}
// SegmentName builds a segment name for the directory.
func SegmentName(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf("%08d", i))
}
// nextSegment creates the next segment and closes the previous one.
func (w *WAL) nextSegment() error {
// Only flush the current page if it actually holds data.
if w.page.alloc > 0 {
if err := w.flushPage(true); err != nil {
return err
}
}
next, err := CreateSegment(w.dir, w.segment.Index()+1)
if err != nil {
return errors.Wrap(err, "create new segment file")
}
prev := w.segment
w.segment = next
w.donePages = 0
// Don't block further writes by fsyncing the last segment.
w.actorc <- func() {
if err := w.fsync(prev); err != nil {
level.Error(w.logger).Log("msg", "sync previous segment", "err", err)
}
if err := prev.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err)
}
}
return nil
}
// flushPage writes the new contents of the page to disk. If no more records will fit into
// the page, the remaining bytes will be set to zero and a new page will be started.
// If clear is true, this is enforced regardless of how many bytes are left in the page.
func (w *WAL) flushPage(clear bool) error {
w.pageFlushes.Inc()
p := w.page
clear = clear || p.full()
// No more data will fit into the page. Enqueue and clear it.
if clear {
p.alloc = pageSize // write till end of page
w.pageCompletions.Inc()
}
n, err := w.segment.Write(p.buf[p.flushed:p.alloc])
if err != nil {
return err
}
p.flushed += n
// We flushed an entire page, prepare a new one.
if clear {
for i := range p.buf {
p.buf[i] = 0
}
p.alloc = 0
p.flushed = 0
w.donePages++
}
return nil
}
type recType uint8
const (
recPageTerm recType = 0 // Rest of page is empty.
recFull recType = 1 // Full record.
recFirst recType = 2 // First fragment of a record.
recMiddle recType = 3 // Middle fragments of a record.
recLast recType = 4 // Final fragment of a record.
)
func (t recType) String() string {
switch t {
case recPageTerm:
return "zero"
case recFull:
return "full"
case recFirst:
return "first"
case recMiddle:
return "middle"
case recLast:
return "last"
default:
return "<invalid>"
}
}
func (w *WAL) pagesPerSegment() int {
return w.segmentSize / pageSize
}
// Log writes the records into the log.
// Multiple records can be passed at once to reduce writes and increase throughput.
func (w *WAL) Log(recs ...[]byte) error {
w.mtx.Lock()
defer w.mtx.Unlock()
// Callers could just implement their own list record format but adding
// a bit of extra logic here frees them from that overhead.
for i, r := range recs {
if err := w.log(r, i == len(recs)-1); err != nil {
return err
}
}
return nil
}
// log writes rec to the log and forces a flush of the current page if its
// the final record of a batch.
func (w *WAL) log(rec []byte, final bool) error {
// If the record is too big to fit within pages in the current
// segment, terminate the active segment and advance to the next one.
// This ensures that records do not cross segment boundaries.
left := w.page.remaining() - recordHeaderSize // Active pages.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages.
if len(rec) > left {
if err := w.nextSegment(); err != nil {
return err
}
}
// Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records.
for i := 0; i == 0 || len(rec) > 0; i++ {
p := w.page
// Find how much of the record we can fit into the page.
var (
l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize)
part = rec[:l]
buf = p.buf[p.alloc:]
typ recType
)
switch {
case i == 0 && len(part) == len(rec):
typ = recFull
case len(part) == len(rec):
typ = recLast
case i == 0:
typ = recFirst
default:
typ = recMiddle
}
buf[0] = byte(typ)
crc := crc32.Checksum(part, castagnoliTable)
binary.BigEndian.PutUint16(buf[1:], uint16(len(part)))
binary.BigEndian.PutUint32(buf[3:], crc)
copy(buf[recordHeaderSize:], part)
p.alloc += len(part) + recordHeaderSize
// If we wrote a full record, we can fit more records of the batch
// into the page before flushing it.
if final || typ != recFull || w.page.full() {
if err := w.flushPage(false); err != nil {
return err
}
}
rec = rec[l:]
}
return nil
}
// Segments returns the range [m, n] of currently existing segments.
// If no segments are found, m and n are -1.
func (w *WAL) Segments() (m, n int, err error) {
refs, err := listSegments(w.dir)
if err != nil {
return 0, 0, err
}
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0].n, refs[len(refs)-1].n, nil
}
// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error {
refs, err := listSegments(w.dir)
if err != nil {
return err
}
for _, r := range refs {
if r.n >= i {
break
}
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil {
return err
}
}
return nil
}
func (w *WAL) fsync(f *Segment) error {
start := time.Now()
err := fileutil.Fsync(f.File)
w.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}
// Close flushes all writes and closes active segment.
func (w *WAL) Close() (err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
// Flush the last page and zero out all its remaining size.
// We must not flush an empty page as it would falsely signal
// the segment is done if we start writing to it again after opening.
if w.page.alloc > 0 {
if err := w.flushPage(true); err != nil {
return err
}
}
donec := make(chan struct{})
w.stopc <- donec
<-donec
if err = w.fsync(w.segment); err != nil {
level.Error(w.logger).Log("msg", "sync previous segment", "err", err)
}
if err := w.segment.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err)
}
return nil
}
type segmentRef struct {
s string
n int
}
func listSegments(dir string) (refs []segmentRef, err error) {
files, err := fileutil.ReadDir(dir)
if err != nil {
return nil, err
}
var last int
for _, fn := range files {
k, err := strconv.Atoi(fn)
if err != nil {
continue
}
if len(refs) > 0 && k > last+1 {
return nil, errors.New("segments are not sequential")
}
refs = append(refs, segmentRef{s: fn, n: k})
last = k
}
sort.Slice(refs, func(i, j int) bool {
return refs[i].n < refs[j].n
})
return refs, nil
}
// NewSegmentsReader returns a new reader over all segments in the directory.
func NewSegmentsReader(dir string) (io.ReadCloser, error) {
return NewSegmentsRangeReader(dir, 0, math.MaxInt64)
}
// NewSegmentsRangeReader returns a new reader over the given WAL segment range.
// If m or n are -1, the range is open on the respective end.
func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) {
refs, err := listSegments(dir)
if err != nil {
return nil, err
}
var segs []*Segment
for _, r := range refs {
if m >= 0 && r.n < m {
continue
}
if n >= 0 && r.n > n {
break
}
s, err := OpenReadSegment(filepath.Join(dir, r.s))
if err != nil {
return nil, err
}
segs = append(segs, s)
}
return newSegmentBufReader(segs...), nil
}
// segmentBufReader is a buffered reader that reads in multiples of pages.
// The main purpose is that we are able to track segment and offset for
// corruption reporting.
type segmentBufReader struct {
buf *bufio.Reader
segs []*Segment
cur int
off int
more bool
}
func newSegmentBufReader(segs ...*Segment) *segmentBufReader {
return &segmentBufReader{
buf: bufio.NewReaderSize(nil, 16*pageSize),
segs: segs,
cur: -1,
}
}
func (r *segmentBufReader) Close() (err error) {
for _, s := range r.segs {
if e := s.Close(); e != nil {
err = e
}
}
return err
}
func (r *segmentBufReader) Read(b []byte) (n int, err error) {
if !r.more {
if r.cur+1 >= len(r.segs) {
return 0, io.EOF
}
r.cur++
r.off = 0
r.more = true
r.buf.Reset(r.segs[r.cur])
}
n, err = r.buf.Read(b)
r.off += n
if err != io.EOF {
return n, err
}
// Just return what we read so far, but don't signal EOF.
// Only unset more so we don't invalidate the current segment and
// offset before the next read.
r.more = false
return n, nil
}
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io.Reader
err error
rec []byte
buf [pageSize]byte
total int // total bytes processed.
}
// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
return &Reader{rdr: r}
}
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
if errors.Cause(err) == io.EOF {
return false
}
r.err = err
return r.err == nil
}
func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
i := 0
for {
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
}
r.total++
typ := recType(hdr[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize)
if k == pageSize {
continue // Initial 0 byte was last page byte.
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
}
r.total += n
for _, c := range buf[:k] {
if c != 0 {
return errors.New("unexpected non-zero byte in padded page")
}
}
continue
}
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
}
r.total += n
var (
length = binary.BigEndian.Uint16(hdr[1:])
crc = binary.BigEndian.Uint32(hdr[3:])
)
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
}
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
return err
}
r.total += n
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
}
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
r.rec = append(r.rec, buf[:length]...)
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record")
}
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record")
}
case recLast:
if i == 0 {
return errors.New("unexpected last record")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
}
}
// Err returns the last encountered error wrapped in a corruption error.
// If the reader does not allow to infer a segment index and offset, a total
// offset in the reader stream will be provided.
func (r *Reader) Err() error {
if r.err == nil {
return nil
}
if b, ok := r.rdr.(*segmentBufReader); ok {
return &CorruptionErr{
Err: r.err,
Segment: b.segs[b.cur].Index(),
Offset: b.off,
}
}
return &CorruptionErr{
Err: r.err,
Segment: -1,
Offset: r.total,
}
}
// Record returns the current record. The returned byte slice is only
// valid until the next call to Next.
func (r *Reader) Record() []byte {
return r.rec
}
func min(i, j int) int {
if i < j {
return i
}
return j
}

361
wal/wal_test.go Normal file
View file

@ -0,0 +1,361 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"bytes"
"encoding/binary"
"hash/crc32"
"io/ioutil"
"math/rand"
"os"
"testing"
"github.com/prometheus/tsdb/testutil"
)
func encodedRecord(t recType, b []byte) []byte {
if t == recPageTerm {
return append([]byte{0}, b...)
}
r := make([]byte, recordHeaderSize)
r[0] = byte(t)
binary.BigEndian.PutUint16(r[1:], uint16(len(b)))
binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable))
return append(r, b...)
}
// TestReader feeds the reader a stream of encoded records with different types.
func TestReader(t *testing.T) {
data := make([]byte, 100000)
_, err := rand.Read(data)
testutil.Ok(t, err)
type record struct {
t recType
b []byte
}
cases := []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, 1000), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
for i, c := range cases {
t.Logf("test %d", i)
var buf []byte
for _, r := range c.t {
buf = append(buf, encodedRecord(r.t, r.b)...)
}
r := NewReader(bytes.NewReader(buf))
for j := 0; r.Next(); j++ {
t.Logf("record %d", j)
rec := r.Record()
if j >= len(c.exp) {
t.Fatal("received more records than inserted")
}
testutil.Equals(t, c.exp[j], rec)
}
if !c.fail && r.Err() != nil {
t.Fatalf("unexpected error: %s", r.Err())
}
if c.fail && r.Err() == nil {
t.Fatalf("expected error but got none")
}
}
}
func TestWAL_FuzzWriteRead(t *testing.T) {
const count = 25000
dir, err := ioutil.TempDir("", "walfuzz")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
var input [][]byte
var recs [][]byte
for i := 0; i < count; i++ {
var sz int
switch i % 5 {
case 0, 1:
sz = 50
case 2, 3:
sz = pageSize
default:
sz = 8 * pageSize
}
rec := make([]byte, rand.Intn(sz))
_, err := rand.Read(rec)
testutil.Ok(t, err)
input = append(input, rec)
recs = append(recs, rec)
// Randomly batch up records.
if rand.Intn(4) < 3 {
testutil.Ok(t, w.Log(recs...))
recs = recs[:0]
}
}
testutil.Ok(t, w.Log(recs...))
m, n, err := w.Segments()
testutil.Ok(t, err)
rc, err := NewSegmentsRangeReader(dir, m, n)
testutil.Ok(t, err)
defer rc.Close()
rdr := NewReader(rc)
for i := 0; rdr.Next(); i++ {
rec := rdr.Record()
if i >= len(input) {
t.Fatal("read too many records")
}
if !bytes.Equal(input[i], rec) {
t.Fatalf("record %d (len %d) does not match (expected len %d)",
i, len(rec), len(input[i]))
}
}
testutil.Ok(t, rdr.Err())
}
func TestWAL_Repair(t *testing.T) {
for name, cf := range map[string]func(f *os.File){
"bad_fragment_sequence": func(f *os.File) {
_, err := f.Seek(pageSize, 0)
testutil.Ok(t, err)
_, err = f.Write([]byte{byte(recLast)})
testutil.Ok(t, err)
},
"bad_fragment_flag": func(f *os.File) {
_, err := f.Seek(pageSize, 0)
testutil.Ok(t, err)
_, err = f.Write([]byte{123})
testutil.Ok(t, err)
},
"bad_checksum": func(f *os.File) {
_, err := f.Seek(pageSize+4, 0)
testutil.Ok(t, err)
_, err = f.Write([]byte{0})
testutil.Ok(t, err)
},
"bad_length": func(f *os.File) {
_, err := f.Seek(pageSize+2, 0)
testutil.Ok(t, err)
_, err = f.Write([]byte{0})
testutil.Ok(t, err)
},
"bad_content": func(f *os.File) {
_, err := f.Seek(pageSize+100, 0)
testutil.Ok(t, err)
_, err = f.Write([]byte("beef"))
testutil.Ok(t, err)
},
} {
t.Run(name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "wal_repair")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
// We create 3 segments with 3 records each and then corrupt the 2nd record
// of the 2nd segment.
// As a result we want a repaired WAL with the first 4 records intact.
w, err := NewSize(nil, nil, dir, 3*pageSize)
testutil.Ok(t, err)
var records [][]byte
for i := 1; i <= 9; i++ {
b := make([]byte, pageSize-recordHeaderSize)
b[0] = byte(i)
records = append(records, b)
testutil.Ok(t, w.Log(b))
}
testutil.Ok(t, w.Close())
f, err := os.OpenFile(SegmentName(dir, 1), os.O_RDWR, 0666)
testutil.Ok(t, err)
// Apply corruption function.
cf(f)
testutil.Ok(t, f.Close())
w, err = New(nil, nil, dir)
testutil.Ok(t, err)
sr, err := NewSegmentsReader(dir)
testutil.Ok(t, err)
r := NewReader(sr)
for r.Next() {
}
testutil.NotOk(t, r.Err())
testutil.Ok(t, w.Repair(r.Err()))
sr, err = NewSegmentsReader(dir)
testutil.Ok(t, err)
r = NewReader(sr)
var result [][]byte
for r.Next() {
var b []byte
result = append(result, append(b, r.Record()...))
}
testutil.Ok(t, r.Err())
testutil.Equals(t, 4, len(result))
for i, r := range result {
if !bytes.Equal(records[i], r) {
t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10])
}
}
})
}
}
func BenchmarkWAL_LogBatched(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_logbatch")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
w, err := New(nil, nil, "testdir")
testutil.Ok(b, err)
defer w.Close()
var buf [2048]byte
var recs [][]byte
b.SetBytes(2048)
for i := 0; i < b.N; i++ {
recs = append(recs, buf[:])
if len(recs) < 1000 {
continue
}
err := w.Log(recs...)
testutil.Ok(b, err)
recs = recs[:0]
}
// Stop timer to not count fsync time on close.
// If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well.
b.StopTimer()
}
func BenchmarkWAL_Log(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_logsingle")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
w, err := New(nil, nil, "testdir")
testutil.Ok(b, err)
defer w.Close()
var buf [2048]byte
b.SetBytes(2048)
for i := 0; i < b.N; i++ {
err := w.Log(buf[:])
testutil.Ok(b, err)
}
// Stop timer to not count fsync time on close.
// If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well.
b.StopTimer()
}

View file

@ -19,6 +19,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"path"
"testing"
"time"
@ -26,6 +27,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)
func TestSegmentWAL_cut(t *testing.T) {
@ -431,3 +433,117 @@ func TestWALRestoreCorrupted(t *testing.T) {
})
}
}
func TestMigrateWAL_Empty(t *testing.T) {
// The migration proecedure must properly deal with a zero-length segment,
// which is valid in the new format.
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wdir := path.Join(dir, "wal")
// Initialize empty WAL.
w, err := wal.New(nil, nil, wdir)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
testutil.Ok(t, MigrateWAL(nil, wdir))
}
func TestMigrateWAL_Fuzz(t *testing.T) {
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
wdir := path.Join(dir, "wal")
// Should pass if no WAL exists yet.
testutil.Ok(t, MigrateWAL(nil, wdir))
oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
testutil.Ok(t, err)
// Write some data.
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
}))
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
{Ref: 1, T: 100, V: 200},
{Ref: 2, T: 300, V: 400},
}))
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
}))
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
{Ref: 3, T: 100, V: 200},
{Ref: 4, T: 300, V: 400},
}))
testutil.Ok(t, oldWAL.LogDeletes([]Stone{
{ref: 1, intervals: []Interval{{100, 200}}},
}))
testutil.Ok(t, oldWAL.Close())
// Perform migration.
testutil.Ok(t, MigrateWAL(nil, wdir))
w, err := wal.New(nil, nil, wdir)
testutil.Ok(t, err)
// We can properly write some new data after migration.
var enc RecordEncoder
testutil.Ok(t, w.Log(enc.Samples([]RefSample{
{Ref: 500, T: 1, V: 1},
}, nil)))
testutil.Ok(t, w.Close())
// Read back all data.
sr, err := wal.NewSegmentsReader(wdir)
testutil.Ok(t, err)
r := wal.NewReader(sr)
var res []interface{}
var dec RecordDecoder
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
s, err := dec.Series(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
case RecordSamples:
s, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
case RecordTombstones:
s, err := dec.Tombstones(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
default:
t.Fatalf("unknown record type %d", dec.Type(rec))
}
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []interface{}{
[]RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
},
[]RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
[]RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
},
[]RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
[]Stone{{ref: 1, intervals: []Interval{{100, 200}}}},
[]RefSample{{Ref: 500, T: 1, V: 1}},
}, res)
// Migrating an already migrated WAL shouldn't do anything.
testutil.Ok(t, MigrateWAL(nil, wdir))
}