Add checkpointing of WAL segments

Create checkpoints from a sequence of WAL segments while filtering
out obsolete data. The checkpoint format is again a sequence of WAL
segments, which allows us to reuse the serialization format and
implementation.

Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
Fabian Reinartz 2018-05-17 09:02:47 -04:00
parent 449a2d0db7
commit 008399a6e0
5 changed files with 770 additions and 0 deletions

279
checkpoint.go Normal file
View file

@ -0,0 +1,279 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)
// CheckpointStats returns stats about a created checkpoint.
type CheckpointStats struct {
DroppedSeries int
DroppedSamples int
DroppedTombstones int
TotalSeries int
TotalSamples int
TotalTombstones int
}
// LastCheckpoint returns the directory name of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
// Traverse list backwards since there may be multiple checkpoints left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
return fi.Name(), k, nil
}
return "", 0, ErrNotFound
}
// DeleteCheckpoints deletes all checkpoints in dir that have an index
// below n.
func DeleteCheckpoints(dir string, n int) error {
var errs MultiError
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || k >= n {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
const checkpointPrefix = "checkpoint."
// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
stats := &CheckpointStats{}
var sr io.Reader
{
lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
}
if err == nil {
if m > k+1 {
return nil, errors.New("unexpected gap to last checkpoint")
}
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
m = k + 1
last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn))
if err != nil {
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
sr = last
}
segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
}
defer segs.Close()
if sr != nil {
sr = io.MultiReader(sr, segs)
} else {
sr = segs
}
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n))
cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
r := wal.NewReader(sr)
var (
series []RefSeries
samples []RefSample
tstones []Stone
dec RecordDecoder
enc RecordEncoder
buf []byte
recs [][]byte
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
// Remember where the record for this iteration starts.
start := len(buf)
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
if err != nil {
return nil, errors.Wrap(err, "decode series")
}
// Drop irrelevant series in place.
repl := series[:0]
for _, s := range series {
if keep(s.Ref) {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Series(repl, buf)
}
stats.TotalSeries += len(series)
stats.DroppedSeries += len(series) - len(repl)
case RecordSamples:
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, errors.Wrap(err, "decode samples")
}
// Drop irrelevant samples in place.
repl := samples[:0]
for _, s := range samples {
if s.T >= mint {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Samples(repl, buf)
}
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)
case RecordTombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return nil, errors.Wrap(err, "decode deletes")
}
// Drop irrelevant tombstones in place.
repl := tstones[:0]
for _, s := range tstones {
for _, iv := range s.intervals {
if iv.Maxt >= mint {
repl = append(repl, s)
break
}
}
}
if len(repl) > 0 {
buf = enc.Tombstones(repl, buf)
}
stats.TotalTombstones += len(tstones)
stats.DroppedTombstones += len(tstones) - len(repl)
default:
return nil, errors.New("invalid record type")
}
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 1 MB increments.
if len(buf) > 1*1024*1024 {
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
// If we hit any corruption during checkpointing, repairing is not an option.
// The head won't know which series records are lost.
if r.Err() != nil {
return nil, errors.Wrap(r.Err(), "read segments")
}
// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
}
if err := fileutil.Rename(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint file")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
}
return stats, nil
}

182
checkpoint_test.go Normal file
View file

@ -0,0 +1,182 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)
func TestLastCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
s, k, err := LastCheckpoint(dir)
testutil.Equals(t, ErrNotFound, err)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.0000", s)
testutil.Equals(t, 0, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.0000", s)
testutil.Equals(t, 0, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.1", s)
testutil.Equals(t, 1, k)
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777))
s, k, err = LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, "checkpoint.1000", s)
testutil.Equals(t, 1000, k)
}
func TestDeleteCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
testutil.Ok(t, DeleteCheckpoints(dir, 0))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.01"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.02"), 0777))
testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.03"), 0777))
testutil.Ok(t, DeleteCheckpoints(dir, 2))
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files)
}
func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
fmt.Println(dir)
var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())
// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
testutil.Ok(t, err)
// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))
last += 100
}
testutil.Ok(t, w.Close())
stats, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Equals(t, 106, stats.HighSegment)
// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
defer sr.Close()
var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
}
}
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

View file

@ -6,6 +6,7 @@ package fileutil
import ( import (
"os" "os"
"path/filepath"
"sort" "sort"
) )
@ -23,3 +24,25 @@ func ReadDir(dirpath string) ([]string, error) {
sort.Strings(names) sort.Strings(names)
return names, nil return names, nil
} }
// Rename safely renames a file.
func Rename(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return err
}
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = Fsync(pdir); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}

213
record.go Normal file
View file

@ -0,0 +1,213 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"math"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
)
// RecordType represents the data type of a record.
type RecordType uint8
const (
RecordInvalid RecordType = 255
RecordSeries RecordType = 1
RecordSamples RecordType = 2
RecordTombstones RecordType = 3
)
type RecordLogger interface {
Log(recs ...[]byte) error
}
type RecordReader interface {
Next() bool
Err() error
Record() []byte
}
// RecordDecoder decodes series, sample, and tombstone records.
// The zero value is ready to use.
type RecordDecoder struct {
}
// Type returns the type of the record.
// Return RecordInvalid if no valid record type is found.
func (d *RecordDecoder) Type(rec []byte) RecordType {
if len(rec) < 1 {
return RecordInvalid
}
switch t := RecordType(rec[0]); t {
case RecordSeries, RecordSamples, RecordTombstones:
return t
}
return RecordInvalid
}
// Series appends series in rec to the given slice.
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordSeries {
return nil, errors.New("invalid record type")
}
for len(dec.b) > 0 && dec.err() == nil {
ref := dec.be64()
lset := make(labels.Labels, dec.uvarint())
for i := range lset {
lset[i].Name = dec.uvarintStr()
lset[i].Value = dec.uvarintStr()
}
sort.Sort(lset)
series = append(series, RefSeries{
Ref: ref,
Labels: lset,
})
}
if dec.err() != nil {
return nil, dec.err()
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return series, nil
}
// Samples appends samples in rec to the given slice.
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordSamples {
return nil, errors.New("invalid record type")
}
if dec.len() == 0 {
return samples, nil
}
var (
baseRef = dec.be64()
baseTime = dec.be64int64()
)
for len(dec.b) > 0 && dec.err() == nil {
dref := dec.varint64()
dtime := dec.varint64()
val := dec.be64()
samples = append(samples, RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
})
}
if dec.err() != nil {
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples))
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return samples, nil
}
// Tombstones appends tombstones in rec to the given slice.
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordTombstones {
return nil, errors.New("invalid record type")
}
for dec.len() > 0 && dec.err() == nil {
tstones = append(tstones, Stone{
ref: dec.be64(),
intervals: Intervals{
{Mint: dec.varint64(), Maxt: dec.varint64()},
},
})
}
if dec.err() != nil {
return nil, dec.err()
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return tstones, nil
}
// RecordEncoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type RecordEncoder struct {
}
// Series appends the encoded series to b and returns the resulting slice.
func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordSeries))
for _, s := range series {
buf.putBE64(s.Ref)
buf.putUvarint(len(s.Labels))
for _, l := range s.Labels {
buf.putUvarintStr(l.Name)
buf.putUvarintStr(l.Value)
}
}
return buf.get()
}
// Samples appends the encoded samples to b and returns the resulting slice.
func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordSamples))
if len(samples) == 0 {
return buf.get()
}
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
first := samples[0]
buf.putBE64(first.Ref)
buf.putBE64int64(first.T)
for _, s := range samples {
buf.putVarint64(int64(s.Ref) - int64(first.Ref))
buf.putVarint64(s.T - first.T)
buf.putBE64(math.Float64bits(s.V))
}
return buf.get()
}
// Tombstones appends the encoded tombstones to b and returns the resulting slice.
func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordTombstones))
for _, s := range tstones {
for _, iv := range s.intervals {
buf.putBE64(s.ref)
buf.putVarint64(iv.Mint)
buf.putVarint64(iv.Maxt)
}
}
return buf.get()
}

73
record_test.go Normal file
View file

@ -0,0 +1,73 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"testing"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
func TestRecord_EncodeDecode(t *testing.T) {
var enc RecordEncoder
var dec RecordDecoder
series := []RefSeries{
{
Ref: 100,
Labels: labels.FromStrings("abc", "def", "123", "456"),
}, {
Ref: 1,
Labels: labels.FromStrings("abc", "def2", "1234", "4567"),
}, {
Ref: 435245,
Labels: labels.FromStrings("xyz", "def", "foo", "bar"),
},
}
decSeries, err := dec.Series(enc.Series(series, nil), nil)
testutil.Ok(t, err)
testutil.Equals(t, series, decSeries)
samples := []RefSample{
{Ref: 0, T: 12423423, V: 1.2345},
{Ref: 123, T: -1231, V: -123},
{Ref: 2, T: 0, V: 99999},
}
decSamples, err := dec.Samples(enc.Samples(samples, nil), nil)
testutil.Ok(t, err)
testutil.Equals(t, samples, decSamples)
// Intervals get split up into single entries. So we don't get back exactly
// what we put in.
tstones := []Stone{
{ref: 123, intervals: Intervals{
{Mint: -1000, Maxt: 1231231},
{Mint: 5000, Maxt: 0},
}},
{ref: 13, intervals: Intervals{
{Mint: -1000, Maxt: -11},
{Mint: 5000, Maxt: 1000},
}},
}
decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil)
testutil.Ok(t, err)
testutil.Equals(t, []Stone{
{ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}},
{ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}},
{ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}},
{ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}},
}, decTstones)
}