From 3eb4119ab1e643b4f9277a4dd18a52acab906eac Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 23 May 2017 16:15:16 +0530 Subject: [PATCH] Make HeadBlock use WAL. Signed-off-by: Goutham Veeramachaneni --- block.go | 3 + head.go | 72 ++++++++++++++---------- wal.go | 154 ++++++++++++++++++++++++++++++++++++++++++---------- wal_test.go | 96 ++++++++++++++++++++++---------- 4 files changed, 238 insertions(+), 87 deletions(-) diff --git a/block.go b/block.go index a134acf43..41e5c8b39 100644 --- a/block.go +++ b/block.go @@ -260,6 +260,9 @@ Outer: if maxtime > maxt { maxtime = maxt } + if mint < chunks[0].MinTime { + mint = chunks[0].MinTime + } delStones[p.At()] = intervals{{mint, maxtime}} continue Outer } diff --git a/head.go b/head.go index db7becfb8..2eae0952e 100644 --- a/head.go +++ b/head.go @@ -100,11 +100,6 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { return "", err } - // Write an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { - return "", err - } - return dir, renameFile(tmp, dir) } @@ -131,16 +126,19 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { func (h *HeadBlock) init() error { r := h.wal.Reader() - for r.Next() { - series, samples := r.At() - + seriesFunc := func(series []labels.Labels) error { for _, lset := range series { h.create(lset.Hash(), lset) h.meta.Stats.NumSeries++ } + + return nil + } + samplesFunc := func(samples []RefSample) error { for _, s := range samples { if int(s.Ref) >= len(h.series) { - return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) + return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", + s.Ref, len(h.series)) } h.series[s.Ref].append(s.T, s.V) @@ -149,22 +147,26 @@ func (h *HeadBlock) init() error { } h.meta.Stats.NumSamples++ } + + return nil } - if err := r.Err(); err != nil { + deletesFunc := func(stones []stone) error { + for _, s := range stones { + for _, itv := range s.intervals { + // TODO(gouthamve): Recheck. + h.tombstones.stones[s.ref].add(itv) + } + } + + return nil + } + + if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { return errors.Wrap(err, "consume WAL") } + h.tombstones = newMapTombstoneReader(h.tombstones.stones) - tr, err := readTombstoneFile(h.dir) - if err != nil { - return errors.Wrap(err, "read tombstones file") - } - - for tr.Next() { - s := tr.At() - h.tombstones.refs = append(h.tombstones.refs, s.ref) - h.tombstones.stones[s.ref] = s.intervals - } - return errors.Wrap(err, "tombstones reader iteration") + return nil } // inBounds returns true if the given timestamp is within the valid @@ -230,6 +232,7 @@ func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { pr := newPostingsReader(ir) p, absent := pr.Select(ms...) + newStones := make(map[uint32]intervals) Outer: for p.Next() { ref := p.At() @@ -245,15 +248,26 @@ Outer: if maxtime > maxt { maxtime = maxt } - h.tombstones.stones[ref] = h.tombstones.stones[ref].add(interval{mint, maxtime}) + if mint < h.series[ref].chunks[0].minTime { + mint = h.series[ref].chunks[0].minTime + } + + newStones[ref] = intervals{{mint, maxtime}} } if p.Err() != nil { return p.Err() } + if err := h.wal.LogDeletes(newMapTombstoneReader(newStones)); err != nil { + return err + } + for k, v := range newStones { + h.tombstones.stones[k] = h.tombstones.stones[k].add(v[0]) + } h.tombstones = newMapTombstoneReader(h.tombstones.stones) - return writeTombstoneFile(h.dir, h.tombstones.Copy()) + + return nil } // Dir returns the directory of the block. @@ -486,6 +500,7 @@ func (a *headAppender) createSeries() { func (a *headAppender) Commit() error { defer atomic.AddUint64(&a.activeWriters, ^uint64(0)) defer putHeadAppendBuffer(a.samples) + defer a.mtx.RUnlock() a.createSeries() @@ -497,11 +512,14 @@ func (a *headAppender) Commit() error { } } + var err MultiError + // Write all new series and samples to the WAL and add it to the // in-mem database on success. - if err := a.wal.Log(a.newLabels, a.samples); err != nil { - a.mtx.RUnlock() - return err + err.Add(a.wal.LogSeries(a.newLabels)) + err.Add(a.wal.LogSamples(a.samples)) + if err.Err() != nil { + return err.Err() } total := uint64(len(a.samples)) @@ -512,8 +530,6 @@ func (a *headAppender) Commit() error { } } - a.mtx.RUnlock() - atomic.AddUint64(&a.meta.Stats.NumSamples, total) atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries))) diff --git a/wal.go b/wal.go index 63975de2a..6de9f73ad 100644 --- a/wal.go +++ b/wal.go @@ -46,8 +46,18 @@ const ( WALEntrySymbols WALEntryType = 1 WALEntrySeries WALEntryType = 2 WALEntrySamples WALEntryType = 3 + WALEntryDeletes WALEntryType = 4 ) +// SamplesCB yolo. +type SamplesCB func([]RefSample) error + +// SeriesCB yolo. +type SeriesCB func([]labels.Labels) error + +// DeletesCB yolo. +type DeletesCB func([]stone) error + // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { mtx sync.Mutex @@ -71,15 +81,15 @@ type SegmentWAL struct { // It must be completely read before new entries are logged. type WAL interface { Reader() WALReader - Log([]labels.Labels, []RefSample) error + LogSeries([]labels.Labels) error + LogSamples([]RefSample) error + LogDeletes(TombstoneReader) error Close() error } // WALReader reads entries from a WAL. type WALReader interface { - At() ([]labels.Labels, []RefSample) - Next() bool - Err() error + Read(SeriesCB, SamplesCB, DeletesCB) error } // RefSample is a timestamp/value pair associated with a reference to a series. @@ -141,13 +151,40 @@ func (w *SegmentWAL) Reader() WALReader { } // Log writes a batch of new series labels and samples to the log. -func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { +//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { +//return nil +//} + +// LogSeries writes a batch of new series labels to the log. +func (w *SegmentWAL) LogSeries(series []labels.Labels) error { if err := w.encodeSeries(series); err != nil { return err } + + if w.flushInterval <= 0 { + return w.Sync() + } + return nil +} + +// LogSamples writes a batch of new samples to the log. +func (w *SegmentWAL) LogSamples(samples []RefSample) error { if err := w.encodeSamples(samples); err != nil { return err } + + if w.flushInterval <= 0 { + return w.Sync() + } + return nil +} + +// LogDeletes write a batch of new deletes to the log. +func (w *SegmentWAL) LogDeletes(tr TombstoneReader) error { + if err := w.encodeDeletes(tr); err != nil { + return err + } + if w.flushInterval <= 0 { return w.Sync() } @@ -369,6 +406,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { const ( walSeriesSimple = 1 walSamplesSimple = 1 + walDeletesSimple = 1 ) var walBuffers = sync.Pool{} @@ -445,6 +483,27 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error { return w.entry(WALEntrySamples, walSamplesSimple, buf) } +func (w *SegmentWAL) encodeDeletes(tr TombstoneReader) error { + b := make([]byte, 2*binary.MaxVarintLen64) + eb := &encbuf{b: b} + buf := getWALBuffer() + for tr.Next() { + eb.reset() + s := tr.At() + eb.putUvarint32(s.ref) + eb.putUvarint(len(s.intervals)) + buf = append(buf, eb.get()...) + for _, itv := range s.intervals { + eb.reset() + eb.putVarint64(itv.mint) + eb.putVarint64(itv.maxt) + buf = append(buf, eb.get()...) + } + } + + return w.entry(WALEntryDeletes, walDeletesSimple, buf) +} + // walReader decodes and emits write ahead log entries. type walReader struct { logger log.Logger @@ -454,9 +513,15 @@ type walReader struct { buf []byte crc32 hash.Hash32 - err error - labels []labels.Labels samples []RefSample + series []labels.Labels + stones []stone + + samplesFunc SamplesCB + seriesFunc SeriesCB + deletesFunc DeletesCB + + err error } func newWALReader(w *SegmentWAL, l log.Logger) *walReader { @@ -471,18 +536,22 @@ func newWALReader(w *SegmentWAL, l log.Logger) *walReader { } } -// At returns the last decoded entry of labels or samples. -// The returned slices are only valid until the next call to Next(). Their elements -// have to be copied to preserve them. -func (r *walReader) At() ([]labels.Labels, []RefSample) { - return r.labels, r.samples -} - // Err returns the last error the reader encountered. func (r *walReader) Err() error { return r.err } +func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { + r.samplesFunc = samplesf + r.seriesFunc = seriesf + r.deletesFunc = deletesf + + for r.next() { + } + + return r.Err() +} + // nextEntry retrieves the next entry. It is also used as a testing hook. func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { if r.cur >= len(r.wal.files) { @@ -505,11 +574,12 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { return et, flag, b, err } -// Next returns decodes the next entry pair and returns true +// next returns decodes the next entry pair and returns true // if it was succesful. -func (r *walReader) Next() bool { - r.labels = r.labels[:0] +func (r *walReader) next() bool { + r.series = r.series[:0] r.samples = r.samples[:0] + r.stones = r.stones[:0] if r.cur >= len(r.wal.files) { return false @@ -537,7 +607,7 @@ func (r *walReader) Next() bool { return false } r.cur++ - return r.Next() + return r.next() } if err != nil { r.err = err @@ -550,16 +620,13 @@ func (r *walReader) Next() bool { // In decoding below we never return a walCorruptionErr for now. // Those should generally be catched by entry decoding before. - switch et { - case WALEntrySamples: - if err := r.decodeSamples(flag, b); err != nil { - r.err = err - } case WALEntrySeries: - if err := r.decodeSeries(flag, b); err != nil { - r.err = err - } + r.err = r.decodeSeries(flag, b) + case WALEntrySamples: + r.err = r.decodeSamples(flag, b) + case WALEntryDeletes: + r.err = r.decodeDeletes(flag, b) } return r.err == nil } @@ -617,7 +684,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if etype == 0 { return 0, 0, nil, io.EOF } - if etype != WALEntrySeries && etype != WALEntrySamples { + if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes { return 0, 0, nil, walCorruptionErrf("invalid entry type %d", etype) } @@ -669,12 +736,14 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error { b = b[n+int(vl):] } - r.labels = append(r.labels, lset) + r.series = append(r.series, lset) } - return nil + return r.seriesFunc(r.series) } func (r *walReader) decodeSamples(flag byte, b []byte) error { + r.samples = r.samples[:] + if len(b) < 16 { return errors.Wrap(errInvalidSize, "header length") } @@ -710,5 +779,30 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error { r.samples = append(r.samples, smpl) } - return nil + return r.samplesFunc(r.samples) +} + +func (r *walReader) decodeDeletes(flag byte, b []byte) error { + db := &decbuf{b: b} + r.samples = r.samples[:] + + for db.len() > 0 { + var s stone + s.ref = uint32(db.uvarint()) + l := db.uvarint() + if db.err() != nil { + return db.err() + } + + for i := 0; i < l; i++ { + s.intervals = append(s.intervals, interval{db.varint64(), db.varint64()}) + if db.err() != nil { + return db.err() + } + } + + r.stones = append(r.stones, s) + } + + return r.deletesFunc(r.stones) } diff --git a/wal_test.go b/wal_test.go index c2988e7e0..3f622df72 100644 --- a/wal_test.go +++ b/wal_test.go @@ -149,6 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( recordedSeries [][]labels.Labels recordedSamples [][]RefSample + recordedDeletes [][]stone ) var totalSamples int @@ -166,32 +167,51 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( resultSeries [][]labels.Labels resultSamples [][]RefSample + resultDeletes [][]stone ) - for r.Next() { - lsets, smpls := r.At() - + serf := func(lsets []labels.Labels) error { if len(lsets) > 0 { clsets := make([]labels.Labels, len(lsets)) copy(clsets, lsets) resultSeries = append(resultSeries, clsets) } + + return nil + } + smplf := func(smpls []RefSample) error { if len(smpls) > 0 { csmpls := make([]RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } + + return nil } - require.NoError(t, r.Err()) + + // TODO: Add this. + delf := func(stones []stone) error { + if len(stones) > 0 { + cstones := make([]stone, len(stones)) + copy(cstones, stones) + resultDeletes = append(resultDeletes, cstones) + } + + return nil + } + + require.NoError(t, r.Read(serf, smplf, delf)) require.Equal(t, recordedSamples, resultSamples) require.Equal(t, recordedSeries, resultSeries) + require.Equal(t, recordedDeletes, resultDeletes) series := series[k : k+(numMetrics/iterations)] // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { var samples []RefSample + stones := map[uint32]intervals{} for j := 0; j < i*10; j++ { samples = append(samples, RefSample{ @@ -201,9 +221,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { }) } + for j := 0; j < i*20; j++ { + ts := rand.Int63() + stones[rand.Uint32()] = intervals{{ts, ts + rand.Int63n(10000)}} + } + lbls := series[i : i+stepSize] - require.NoError(t, w.Log(lbls, samples)) + require.NoError(t, w.LogSeries(lbls)) + require.NoError(t, w.LogSamples(samples)) + require.NoError(t, w.LogDeletes(newMapTombstoneReader(stones))) if len(lbls) > 0 { recordedSeries = append(recordedSeries, lbls) @@ -212,6 +239,16 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { recordedSamples = append(recordedSamples, samples) totalSamples += len(samples) } + if len(stones) > 0 { + tr := newMapTombstoneReader(stones) + newdels := []stone{} + for tr.Next() { + newdels = append(newdels, tr.At()) + } + require.NoError(t, tr.Err()) + + recordedDeletes = append(recordedDeletes, newdels) + } } require.NoError(t, w.Close()) @@ -292,13 +329,13 @@ func TestWALRestoreCorrupted(t *testing.T) { w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) - require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) - require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) require.NoError(t, w.cut()) - require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}})) - require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) require.NoError(t, w.Close()) @@ -314,17 +351,28 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, err) r := w2.Reader() + serf := func(l []labels.Labels) error { + require.Equal(t, 0, len(l)) + return nil + } + delf := func([]stone) error { return nil } - require.True(t, r.Next()) - l, s := r.At() - require.Equal(t, 0, len(l)) - require.Equal(t, []RefSample{{T: 1, V: 2}}, s) + // Weird hack to check order of reads. + i := 0 + samplf := func(s []RefSample) error { + if i == 0 { + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) + i++ + } else { + require.Equal(t, []RefSample{{T: 99, V: 100}}, s) + } - // Truncation should happen transparently and not cause an error. - require.False(t, r.Next()) - require.Nil(t, r.Err()) + return nil + } - require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}})) + require.NoError(t, r.Read(serf, samplf, delf)) + + require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) // We should see the first valid entry and the new one, everything after @@ -334,18 +382,8 @@ func TestWALRestoreCorrupted(t *testing.T) { r = w3.Reader() - require.True(t, r.Next()) - l, s = r.At() - require.Equal(t, 0, len(l)) - require.Equal(t, []RefSample{{T: 1, V: 2}}, s) - - require.True(t, r.Next()) - l, s = r.At() - require.Equal(t, 0, len(l)) - require.Equal(t, []RefSample{{T: 99, V: 100}}, s) - - require.False(t, r.Next()) - require.Nil(t, r.Err()) + i = 0 + require.NoError(t, r.Read(serf, samplf, delf)) }) } }