Make HeadBlock use WAL.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-23 16:15:16 +05:30
parent 244b73fce1
commit 3eb4119ab1
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
4 changed files with 238 additions and 87 deletions

View file

@ -260,6 +260,9 @@ Outer:
if maxtime > maxt {
maxtime = maxt
}
if mint < chunks[0].MinTime {
mint = chunks[0].MinTime
}
delStones[p.At()] = intervals{{mint, maxtime}}
continue Outer
}

72
head.go
View file

@ -100,11 +100,6 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) {
return "", err
}
// Write an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
return "", err
}
return dir, renameFile(tmp, dir)
}
@ -131,16 +126,19 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
func (h *HeadBlock) init() error {
r := h.wal.Reader()
for r.Next() {
series, samples := r.At()
seriesFunc := func(series []labels.Labels) error {
for _, lset := range series {
h.create(lset.Hash(), lset)
h.meta.Stats.NumSeries++
}
return nil
}
samplesFunc := func(samples []RefSample) error {
for _, s := range samples {
if int(s.Ref) >= len(h.series) {
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore",
s.Ref, len(h.series))
}
h.series[s.Ref].append(s.T, s.V)
@ -149,22 +147,26 @@ func (h *HeadBlock) init() error {
}
h.meta.Stats.NumSamples++
}
return nil
}
if err := r.Err(); err != nil {
deletesFunc := func(stones []stone) error {
for _, s := range stones {
for _, itv := range s.intervals {
// TODO(gouthamve): Recheck.
h.tombstones.stones[s.ref].add(itv)
}
}
return nil
}
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
return errors.Wrap(err, "consume WAL")
}
h.tombstones = newMapTombstoneReader(h.tombstones.stones)
tr, err := readTombstoneFile(h.dir)
if err != nil {
return errors.Wrap(err, "read tombstones file")
}
for tr.Next() {
s := tr.At()
h.tombstones.refs = append(h.tombstones.refs, s.ref)
h.tombstones.stones[s.ref] = s.intervals
}
return errors.Wrap(err, "tombstones reader iteration")
return nil
}
// inBounds returns true if the given timestamp is within the valid
@ -230,6 +232,7 @@ func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error {
pr := newPostingsReader(ir)
p, absent := pr.Select(ms...)
newStones := make(map[uint32]intervals)
Outer:
for p.Next() {
ref := p.At()
@ -245,15 +248,26 @@ Outer:
if maxtime > maxt {
maxtime = maxt
}
h.tombstones.stones[ref] = h.tombstones.stones[ref].add(interval{mint, maxtime})
if mint < h.series[ref].chunks[0].minTime {
mint = h.series[ref].chunks[0].minTime
}
newStones[ref] = intervals{{mint, maxtime}}
}
if p.Err() != nil {
return p.Err()
}
if err := h.wal.LogDeletes(newMapTombstoneReader(newStones)); err != nil {
return err
}
for k, v := range newStones {
h.tombstones.stones[k] = h.tombstones.stones[k].add(v[0])
}
h.tombstones = newMapTombstoneReader(h.tombstones.stones)
return writeTombstoneFile(h.dir, h.tombstones.Copy())
return nil
}
// Dir returns the directory of the block.
@ -486,6 +500,7 @@ func (a *headAppender) createSeries() {
func (a *headAppender) Commit() error {
defer atomic.AddUint64(&a.activeWriters, ^uint64(0))
defer putHeadAppendBuffer(a.samples)
defer a.mtx.RUnlock()
a.createSeries()
@ -497,11 +512,14 @@ func (a *headAppender) Commit() error {
}
}
var err MultiError
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
a.mtx.RUnlock()
return err
err.Add(a.wal.LogSeries(a.newLabels))
err.Add(a.wal.LogSamples(a.samples))
if err.Err() != nil {
return err.Err()
}
total := uint64(len(a.samples))
@ -512,8 +530,6 @@ func (a *headAppender) Commit() error {
}
}
a.mtx.RUnlock()
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))

154
wal.go
View file

@ -46,8 +46,18 @@ const (
WALEntrySymbols WALEntryType = 1
WALEntrySeries WALEntryType = 2
WALEntrySamples WALEntryType = 3
WALEntryDeletes WALEntryType = 4
)
// SamplesCB yolo.
type SamplesCB func([]RefSample) error
// SeriesCB yolo.
type SeriesCB func([]labels.Labels) error
// DeletesCB yolo.
type DeletesCB func([]stone) error
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
@ -71,15 +81,15 @@ type SegmentWAL struct {
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
Log([]labels.Labels, []RefSample) error
LogSeries([]labels.Labels) error
LogSamples([]RefSample) error
LogDeletes(TombstoneReader) error
Close() error
}
// WALReader reads entries from a WAL.
type WALReader interface {
At() ([]labels.Labels, []RefSample)
Next() bool
Err() error
Read(SeriesCB, SamplesCB, DeletesCB) error
}
// RefSample is a timestamp/value pair associated with a reference to a series.
@ -141,13 +151,40 @@ func (w *SegmentWAL) Reader() WALReader {
}
// Log writes a batch of new series labels and samples to the log.
func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
//return nil
//}
// LogSeries writes a batch of new series labels to the log.
func (w *SegmentWAL) LogSeries(series []labels.Labels) error {
if err := w.encodeSeries(series); err != nil {
return err
}
if w.flushInterval <= 0 {
return w.Sync()
}
return nil
}
// LogSamples writes a batch of new samples to the log.
func (w *SegmentWAL) LogSamples(samples []RefSample) error {
if err := w.encodeSamples(samples); err != nil {
return err
}
if w.flushInterval <= 0 {
return w.Sync()
}
return nil
}
// LogDeletes write a batch of new deletes to the log.
func (w *SegmentWAL) LogDeletes(tr TombstoneReader) error {
if err := w.encodeDeletes(tr); err != nil {
return err
}
if w.flushInterval <= 0 {
return w.Sync()
}
@ -369,6 +406,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
const (
walSeriesSimple = 1
walSamplesSimple = 1
walDeletesSimple = 1
)
var walBuffers = sync.Pool{}
@ -445,6 +483,27 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
return w.entry(WALEntrySamples, walSamplesSimple, buf)
}
func (w *SegmentWAL) encodeDeletes(tr TombstoneReader) error {
b := make([]byte, 2*binary.MaxVarintLen64)
eb := &encbuf{b: b}
buf := getWALBuffer()
for tr.Next() {
eb.reset()
s := tr.At()
eb.putUvarint32(s.ref)
eb.putUvarint(len(s.intervals))
buf = append(buf, eb.get()...)
for _, itv := range s.intervals {
eb.reset()
eb.putVarint64(itv.mint)
eb.putVarint64(itv.maxt)
buf = append(buf, eb.get()...)
}
}
return w.entry(WALEntryDeletes, walDeletesSimple, buf)
}
// walReader decodes and emits write ahead log entries.
type walReader struct {
logger log.Logger
@ -454,9 +513,15 @@ type walReader struct {
buf []byte
crc32 hash.Hash32
err error
labels []labels.Labels
samples []RefSample
series []labels.Labels
stones []stone
samplesFunc SamplesCB
seriesFunc SeriesCB
deletesFunc DeletesCB
err error
}
func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
@ -471,18 +536,22 @@ func newWALReader(w *SegmentWAL, l log.Logger) *walReader {
}
}
// At returns the last decoded entry of labels or samples.
// The returned slices are only valid until the next call to Next(). Their elements
// have to be copied to preserve them.
func (r *walReader) At() ([]labels.Labels, []RefSample) {
return r.labels, r.samples
}
// Err returns the last error the reader encountered.
func (r *walReader) Err() error {
return r.err
}
func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error {
r.samplesFunc = samplesf
r.seriesFunc = seriesf
r.deletesFunc = deletesf
for r.next() {
}
return r.Err()
}
// nextEntry retrieves the next entry. It is also used as a testing hook.
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
if r.cur >= len(r.wal.files) {
@ -505,11 +574,12 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
return et, flag, b, err
}
// Next returns decodes the next entry pair and returns true
// next returns decodes the next entry pair and returns true
// if it was succesful.
func (r *walReader) Next() bool {
r.labels = r.labels[:0]
func (r *walReader) next() bool {
r.series = r.series[:0]
r.samples = r.samples[:0]
r.stones = r.stones[:0]
if r.cur >= len(r.wal.files) {
return false
@ -537,7 +607,7 @@ func (r *walReader) Next() bool {
return false
}
r.cur++
return r.Next()
return r.next()
}
if err != nil {
r.err = err
@ -550,16 +620,13 @@ func (r *walReader) Next() bool {
// In decoding below we never return a walCorruptionErr for now.
// Those should generally be catched by entry decoding before.
switch et {
case WALEntrySamples:
if err := r.decodeSamples(flag, b); err != nil {
r.err = err
}
case WALEntrySeries:
if err := r.decodeSeries(flag, b); err != nil {
r.err = err
}
r.err = r.decodeSeries(flag, b)
case WALEntrySamples:
r.err = r.decodeSamples(flag, b)
case WALEntryDeletes:
r.err = r.decodeDeletes(flag, b)
}
return r.err == nil
}
@ -617,7 +684,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
if etype == 0 {
return 0, 0, nil, io.EOF
}
if etype != WALEntrySeries && etype != WALEntrySamples {
if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes {
return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype)
}
@ -669,12 +736,14 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error {
b = b[n+int(vl):]
}
r.labels = append(r.labels, lset)
r.series = append(r.series, lset)
}
return nil
return r.seriesFunc(r.series)
}
func (r *walReader) decodeSamples(flag byte, b []byte) error {
r.samples = r.samples[:]
if len(b) < 16 {
return errors.Wrap(errInvalidSize, "header length")
}
@ -710,5 +779,30 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
r.samples = append(r.samples, smpl)
}
return nil
return r.samplesFunc(r.samples)
}
func (r *walReader) decodeDeletes(flag byte, b []byte) error {
db := &decbuf{b: b}
r.samples = r.samples[:]
for db.len() > 0 {
var s stone
s.ref = uint32(db.uvarint())
l := db.uvarint()
if db.err() != nil {
return db.err()
}
for i := 0; i < l; i++ {
s.intervals = append(s.intervals, interval{db.varint64(), db.varint64()})
if db.err() != nil {
return db.err()
}
}
r.stones = append(r.stones, s)
}
return r.deletesFunc(r.stones)
}

View file

@ -149,6 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
var (
recordedSeries [][]labels.Labels
recordedSamples [][]RefSample
recordedDeletes [][]stone
)
var totalSamples int
@ -166,32 +167,51 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
var (
resultSeries [][]labels.Labels
resultSamples [][]RefSample
resultDeletes [][]stone
)
for r.Next() {
lsets, smpls := r.At()
serf := func(lsets []labels.Labels) error {
if len(lsets) > 0 {
clsets := make([]labels.Labels, len(lsets))
copy(clsets, lsets)
resultSeries = append(resultSeries, clsets)
}
return nil
}
smplf := func(smpls []RefSample) error {
if len(smpls) > 0 {
csmpls := make([]RefSample, len(smpls))
copy(csmpls, smpls)
resultSamples = append(resultSamples, csmpls)
}
return nil
}
require.NoError(t, r.Err())
// TODO: Add this.
delf := func(stones []stone) error {
if len(stones) > 0 {
cstones := make([]stone, len(stones))
copy(cstones, stones)
resultDeletes = append(resultDeletes, cstones)
}
return nil
}
require.NoError(t, r.Read(serf, smplf, delf))
require.Equal(t, recordedSamples, resultSamples)
require.Equal(t, recordedSeries, resultSeries)
require.Equal(t, recordedDeletes, resultDeletes)
series := series[k : k+(numMetrics/iterations)]
// Insert in batches and generate different amounts of samples for each.
for i := 0; i < len(series); i += stepSize {
var samples []RefSample
stones := map[uint32]intervals{}
for j := 0; j < i*10; j++ {
samples = append(samples, RefSample{
@ -201,9 +221,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
})
}
for j := 0; j < i*20; j++ {
ts := rand.Int63()
stones[rand.Uint32()] = intervals{{ts, ts + rand.Int63n(10000)}}
}
lbls := series[i : i+stepSize]
require.NoError(t, w.Log(lbls, samples))
require.NoError(t, w.LogSeries(lbls))
require.NoError(t, w.LogSamples(samples))
require.NoError(t, w.LogDeletes(newMapTombstoneReader(stones)))
if len(lbls) > 0 {
recordedSeries = append(recordedSeries, lbls)
@ -212,6 +239,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
recordedSamples = append(recordedSamples, samples)
totalSamples += len(samples)
}
if len(stones) > 0 {
tr := newMapTombstoneReader(stones)
newdels := []stone{}
for tr.Next() {
newdels = append(newdels, tr.At())
}
require.NoError(t, tr.Err())
recordedDeletes = append(recordedDeletes, newdels)
}
}
require.NoError(t, w.Close())
@ -292,13 +329,13 @@ func TestWALRestoreCorrupted(t *testing.T) {
w, err := OpenSegmentWAL(dir, nil, 0)
require.NoError(t, err)
require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}}))
require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}}))
require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}}))
require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}}))
require.NoError(t, w.cut())
require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}}))
require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}}))
require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}}))
require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}}))
require.NoError(t, w.Close())
@ -314,17 +351,28 @@ func TestWALRestoreCorrupted(t *testing.T) {
require.NoError(t, err)
r := w2.Reader()
serf := func(l []labels.Labels) error {
require.Equal(t, 0, len(l))
return nil
}
delf := func([]stone) error { return nil }
require.True(t, r.Next())
l, s := r.At()
require.Equal(t, 0, len(l))
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
// Weird hack to check order of reads.
i := 0
samplf := func(s []RefSample) error {
if i == 0 {
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
i++
} else {
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
}
// Truncation should happen transparently and not cause an error.
require.False(t, r.Next())
require.Nil(t, r.Err())
return nil
}
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
require.NoError(t, r.Read(serf, samplf, delf))
require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}}))
require.NoError(t, w2.Close())
// We should see the first valid entry and the new one, everything after
@ -334,18 +382,8 @@ func TestWALRestoreCorrupted(t *testing.T) {
r = w3.Reader()
require.True(t, r.Next())
l, s = r.At()
require.Equal(t, 0, len(l))
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
require.True(t, r.Next())
l, s = r.At()
require.Equal(t, 0, len(l))
require.Equal(t, []RefSample{{T: 99, V: 100}}, s)
require.False(t, r.Next())
require.Nil(t, r.Err())
i = 0
require.NoError(t, r.Read(serf, samplf, delf))
})
}
}