From 44e9ae38b50004afa6abedc0e2b4aee73fb9d371 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 26 May 2017 21:26:31 +0530 Subject: [PATCH] Incorporate PR feedback. * Expose Stone as it is used in an exported method. * Move from tombstoneReader to []Stone for the same reason as above. * Make WAL reading a little cleaner. Signed-off-by: Goutham Veeramachaneni --- Documentation/format/tombstones.md | 3 +- block.go | 51 ++++++------- block_test.go | 13 ++++ compact.go | 2 +- encoding_helpers.go | 1 - head.go | 37 ++++------ querier.go | 2 +- tombstones.go | 37 ++++++++-- tombstones_test.go | 13 ++++ wal.go | 115 ++++++++++++++++------------- wal_test.go | 23 +++--- 11 files changed, 173 insertions(+), 124 deletions(-) diff --git a/Documentation/format/tombstones.md b/Documentation/format/tombstones.md index 059d1ace5..2af0ac98c 100644 --- a/Documentation/format/tombstones.md +++ b/Documentation/format/tombstones.md @@ -1,6 +1,7 @@ # Tombstones Disk Format -The following describes the format of a tombstones file, which is the directory of a block. +The following describes the format of a tombstones file, which is placed +at the top level directory of a block. The last 8 bytes specifies the offset to the start of Stones section. The stones section is 0 padded to a multiple of 4 for fast scans. diff --git a/block.go b/block.go index b0f88512f..6f2116490 100644 --- a/block.go +++ b/block.go @@ -86,18 +86,16 @@ type BlockMeta struct { // Stats about the contents of the block. Stats struct { - NumSamples uint64 `json:"numSamples,omitempty"` - NumSeries uint64 `json:"numSeries,omitempty"` - NumChunks uint64 `json:"numChunks,omitempty"` + NumSamples uint64 `json:"numSamples,omitempty"` + NumSeries uint64 `json:"numSeries,omitempty"` + NumChunks uint64 `json:"numChunks,omitempty"` + NumTombstones uint64 `json:"numTombstones,omitempty"` } `json:"stats,omitempty"` // Information on compactions the block was created from. Compaction struct { Generation int `json:"generation"` } `json:"compaction"` - - // The number of tombstones. - NumTombstones int64 `json:"numTombstones"` } const ( @@ -161,7 +159,6 @@ type persistedBlock struct { chunkr *chunkReader indexr *indexReader - // For tombstones. tombstones tombstoneReader } @@ -186,11 +183,10 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { } pb := &persistedBlock{ - dir: dir, - meta: *meta, - chunkr: cr, - indexr: ir, - + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, tombstones: tr, } return pb, nil @@ -234,7 +230,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - newStones := map[uint32]intervals{} + stones := map[uint32]intervals{} Outer: for p.Next() { @@ -251,15 +247,9 @@ Outer: for _, chk := range chunks { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { - // Delete only until the current maxtime and not beyond. - maxtime := chunks[len(chunks)-1].MaxTime - if maxtime > maxt { - maxtime = maxt - } - if mint < chunks[0].MinTime { - mint = chunks[0].MinTime - } - newStones[p.At()] = intervals{{mint, maxtime}} + // Delete only until the current vlaues and not beyond. + mint, maxt = clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime) + stones[p.At()] = intervals{{mint, maxt}} continue Outer } } @@ -270,21 +260,32 @@ Outer: } // Merge the current and new tombstones. - for k, v := range newStones { - pb.tombstones[k] = pb.tombstones[k].add(v[0]) + for k, v := range stones { + pb.tombstones.add(k, v[0]) } if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { return err } - pb.meta.NumTombstones = int64(len(pb.tombstones)) + pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones)) return writeMetaFile(pb.dir, &pb.meta) } func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } +func clampInterval(a, b, mint, maxt int64) (int64, int64) { + if a < mint { + a = mint + } + if b > maxt { + b = maxt + } + + return a, b +} + type mmapFile struct { f *os.File b []byte diff --git a/block_test.go b/block_test.go index 35178ff49..e75d4ac3f 100644 --- a/block_test.go +++ b/block_test.go @@ -1 +1,14 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb diff --git a/compact.go b/compact.go index 6bf798833..46ce52eab 100644 --- a/compact.go +++ b/compact.go @@ -428,7 +428,7 @@ func (c *compactionSeriesSet) Next() bool { return false } - c.intervals = c.tombstones.At(c.p.At()) + c.intervals = c.tombstones.Get(c.p.At()) c.l, c.c, c.err = c.index.Series(c.p.At()) if c.err != nil { diff --git a/encoding_helpers.go b/encoding_helpers.go index c1ea902a7..25ff32d00 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -154,7 +154,6 @@ func (d *decbuf) byte() byte { x := d.b[0] d.b = d.b[1:] return x - } func (d *decbuf) decbuf(l int) decbuf { diff --git a/head.go b/head.go index 704c0b222..833f02411 100644 --- a/head.go +++ b/head.go @@ -150,10 +150,10 @@ func (h *HeadBlock) init() error { return nil } - deletesFunc := func(stones []stone) error { + deletesFunc := func(stones []Stone) error { for _, s := range stones { for _, itv := range s.intervals { - h.tombstones[s.ref] = h.tombstones[s.ref].add(itv) + h.tombstones.add(s.ref, itv) } } @@ -230,7 +230,8 @@ func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { pr := newPostingsReader(ir) p, absent := pr.Select(ms...) - newStones := make(map[uint32]intervals) + var stones []Stone + Outer: for p.Next() { ref := p.At() @@ -242,29 +243,22 @@ Outer: } // Delete only until the current values and not beyond. - maxtime := h.series[ref].head().maxTime - if maxtime > maxt { - maxtime = maxt - } - if mint < h.series[ref].chunks[0].minTime { - mint = h.series[ref].chunks[0].minTime - } - - newStones[ref] = intervals{{mint, maxtime}} + mint, maxt = clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime) + stones = append(stones, Stone{ref, intervals{{mint, maxt}}}) } if p.Err() != nil { return p.Err() } - if err := h.wal.LogDeletes(newTombstoneReader(newStones)); err != nil { + if err := h.wal.LogDeletes(stones); err != nil { return err } - for k, v := range newStones { - h.tombstones[k] = h.tombstones[k].add(v[0]) + for _, s := range stones { + h.tombstones.add(s.ref, s.intervals[0]) } - h.meta.NumTombstones = int64(len(h.tombstones)) + h.meta.Stats.NumTombstones = uint64(len(h.tombstones)) return nil } @@ -510,14 +504,13 @@ func (a *headAppender) Commit() error { } } - var err MultiError - // Write all new series and samples to the WAL and add it to the // in-mem database on success. - err.Add(a.wal.LogSeries(a.newLabels)) - err.Add(a.wal.LogSamples(a.samples)) - if err.Err() != nil { - return err.Err() + if err := a.wal.LogSeries(a.newLabels); err != nil { + return errors.Wrap(err, "WAL log series") + } + if err := a.wal.LogSamples(a.samples); err != nil { + return errors.Wrap(err, "WAL log samples") } total := uint64(len(a.samples)) diff --git a/querier.go b/querier.go index 49cd013f4..601fc7440 100644 --- a/querier.go +++ b/querier.go @@ -412,7 +412,7 @@ Outer: s.lset = lset s.chks = chunks - s.intervals = s.tombstones.At(s.p.At()) + s.intervals = s.tombstones.Get(s.p.At()) if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. diff --git a/tombstones.go b/tombstones.go index 7abb025a1..612b3029f 100644 --- a/tombstones.go +++ b/tombstones.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -66,16 +79,16 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { return renameFile(tmp, path) } -// stone holds the information on the posting and time-range +// Stone holds the information on the posting and time-range // that is deleted. -type stone struct { +type Stone struct { ref uint32 intervals intervals } // TombstoneReader is the iterator over tombstones. type TombstoneReader interface { - At(ref uint32) intervals + Get(ref uint32) intervals } func readTombstones(dir string) (tombstoneReader, error) { @@ -84,6 +97,10 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, err } + if len(b) < 5 { + return nil, errors.Wrap(errInvalidSize, "tombstones header") + } + d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { return nil, fmt.Errorf("invalid magic number %x", mg) @@ -92,6 +109,10 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, fmt.Errorf("invalid tombstone format %x", flag) } + if d.err() != nil { + return nil, d.err() + } + // Verify checksum hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) if _, err := hash.Write(d.get()); err != nil { @@ -101,7 +122,7 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, errors.New("checksum did not match") } - stonesMap := make(map[uint32]intervals) + stonesMap := newEmptyTombstoneReader() for d.len() > 0 { k := d.uvarint32() mint := d.varint64() @@ -110,7 +131,7 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, d.err() } - stonesMap[k] = stonesMap[k].add(interval{mint, maxt}) + stonesMap.add(k, interval{mint, maxt}) } return newTombstoneReader(stonesMap), nil @@ -126,10 +147,14 @@ func newEmptyTombstoneReader() tombstoneReader { return tombstoneReader(make(map[uint32]intervals)) } -func (t tombstoneReader) At(ref uint32) intervals { +func (t tombstoneReader) Get(ref uint32) intervals { return t[ref] } +func (t tombstoneReader) add(ref uint32, itv interval) { + t[ref] = t[ref].add(itv) +} + type interval struct { mint, maxt int64 } diff --git a/tombstones_test.go b/tombstones_test.go index 6469d0fbe..bc6199f11 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( diff --git a/wal.go b/wal.go index bf0b1af87..50ddb6e34 100644 --- a/wal.go +++ b/wal.go @@ -56,7 +56,7 @@ type SamplesCB func([]RefSample) error type SeriesCB func([]labels.Labels) error // DeletesCB is the callback after reading deletes. -type DeletesCB func([]stone) error +type DeletesCB func([]Stone) error // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { @@ -83,7 +83,7 @@ type WAL interface { Reader() WALReader LogSeries([]labels.Labels) error LogSamples([]RefSample) error - LogDeletes(tombstoneReader) error + LogDeletes([]Stone) error Close() error } @@ -180,8 +180,8 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { } // LogDeletes write a batch of new deletes to the log. -func (w *SegmentWAL) LogDeletes(tr tombstoneReader) error { - if err := w.encodeDeletes(tr); err != nil { +func (w *SegmentWAL) LogDeletes(stones []Stone) error { + if err := w.encodeDeletes(stones); err != nil { return err } @@ -483,14 +483,14 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error { return w.entry(WALEntrySamples, walSamplesSimple, buf) } -func (w *SegmentWAL) encodeDeletes(tr tombstoneReader) error { +func (w *SegmentWAL) encodeDeletes(stones []Stone) error { b := make([]byte, 2*binary.MaxVarintLen64) eb := &encbuf{b: b} buf := getWALBuffer() - for k, v := range tr { - for _, itv := range v { + for _, s := range stones { + for _, itv := range s.intervals { eb.reset() - eb.putUvarint32(k) + eb.putUvarint32(s.ref) eb.putVarint64(itv.mint) eb.putVarint64(itv.maxt) buf = append(buf, eb.get()...) @@ -509,13 +509,9 @@ type walReader struct { buf []byte crc32 hash.Hash32 - samples []RefSample - series []labels.Labels - stones []stone - - samplesFunc SamplesCB - seriesFunc SeriesCB - deletesFunc DeletesCB + curType WALEntryType + curFlag byte + curBuf []byte err error } @@ -538,11 +534,30 @@ func (r *walReader) Err() error { } func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesCB) error { - r.samplesFunc = samplesf - r.seriesFunc = seriesf - r.deletesFunc = deletesf - for r.next() { + et, flag, b := r.at() + // In decoding below we never return a walCorruptionErr for now. + // Those should generally be catched by entry decoding before. + switch et { + case WALEntrySeries: + s, err := r.decodeSeries(flag, b) + if err != nil { + return err + } + seriesf(s) + case WALEntrySamples: + s, err := r.decodeSamples(flag, b) + if err != nil { + return err + } + samplesf(s) + case WALEntryDeletes: + s, err := r.decodeDeletes(flag, b) + if err != nil { + return err + } + deletesf(s) + } } return r.Err() @@ -570,13 +585,13 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { return et, flag, b, err } +func (r *walReader) at() (WALEntryType, byte, []byte) { + return r.curType, r.curFlag, r.curBuf +} + // next returns decodes the next entry pair and returns true // if it was succesful. func (r *walReader) next() bool { - r.series = r.series[:0] - r.samples = r.samples[:0] - r.stones = r.stones[:0] - if r.cur >= len(r.wal.files) { return false } @@ -614,16 +629,9 @@ func (r *walReader) next() bool { return false } - // In decoding below we never return a walCorruptionErr for now. - // Those should generally be catched by entry decoding before. - switch et { - case WALEntrySeries: - r.err = r.decodeSeries(flag, b) - case WALEntrySamples: - r.err = r.decodeSamples(flag, b) - case WALEntryDeletes: - r.err = r.decodeDeletes(flag, b) - } + r.curType = et + r.curFlag = flag + r.curBuf = b return r.err == nil } @@ -707,11 +715,12 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte) error { +func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) { + series := []labels.Labels{} for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { - return errors.Wrap(errInvalidSize, "number of labels") + return nil, errors.Wrap(errInvalidSize, "number of labels") } b = b[n:] lset := make(labels.Labels, l) @@ -719,29 +728,29 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error { for i := 0; i < int(l); i++ { nl, n := binary.Uvarint(b) if n < 1 || len(b) < n+int(nl) { - return errors.Wrap(errInvalidSize, "label name") + return nil, errors.Wrap(errInvalidSize, "label name") } lset[i].Name = string(b[n : n+int(nl)]) b = b[n+int(nl):] vl, n := binary.Uvarint(b) if n < 1 || len(b) < n+int(vl) { - return errors.Wrap(errInvalidSize, "label value") + return nil, errors.Wrap(errInvalidSize, "label value") } lset[i].Value = string(b[n : n+int(vl)]) b = b[n+int(vl):] } - r.series = append(r.series, lset) + series = append(series, lset) } - return r.seriesFunc(r.series) + return series, nil } -func (r *walReader) decodeSamples(flag byte, b []byte) error { - r.samples = r.samples[:] +func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { + samples := []RefSample{} if len(b) < 16 { - return errors.Wrap(errInvalidSize, "header length") + return nil, errors.Wrap(errInvalidSize, "header length") } var ( baseRef = binary.BigEndian.Uint64(b) @@ -754,7 +763,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error { dref, n := binary.Varint(b) if n < 1 { - return errors.Wrap(errInvalidSize, "sample ref delta") + return nil, errors.Wrap(errInvalidSize, "sample ref delta") } b = b[n:] @@ -762,36 +771,36 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error { dtime, n := binary.Varint(b) if n < 1 { - return errors.Wrap(errInvalidSize, "sample timestamp delta") + return nil, errors.Wrap(errInvalidSize, "sample timestamp delta") } b = b[n:] smpl.T = baseTime + dtime if len(b) < 8 { - return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) + return nil, errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) } smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] - r.samples = append(r.samples, smpl) + samples = append(samples, smpl) } - return r.samplesFunc(r.samples) + return samples, nil } -func (r *walReader) decodeDeletes(flag byte, b []byte) error { +func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { db := &decbuf{b: b} - r.samples = r.samples[:] + stones := []Stone{} for db.len() > 0 { - var s stone + var s Stone s.ref = db.uvarint32() s.intervals = intervals{{db.varint64(), db.varint64()}} if db.err() != nil { - return db.err() + return nil, db.err() } - r.stones = append(r.stones, s) + stones = append(stones, s) } - return r.deletesFunc(r.stones) + return stones, nil } diff --git a/wal_test.go b/wal_test.go index f706849d4..23667fb9a 100644 --- a/wal_test.go +++ b/wal_test.go @@ -149,7 +149,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( recordedSeries [][]labels.Labels recordedSamples [][]RefSample - recordedDeletes []tombstoneReader + recordedDeletes [][]Stone ) var totalSamples int @@ -167,7 +167,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { var ( resultSeries [][]labels.Labels resultSamples [][]RefSample - resultDeletes []tombstoneReader + resultDeletes [][]Stone ) serf := func(lsets []labels.Labels) error { @@ -189,13 +189,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { return nil } - delf := func(stones []stone) error { + delf := func(stones []Stone) error { if len(stones) > 0 { - dels := make(map[uint32]intervals) - for _, s := range stones { - dels[s.ref] = s.intervals - } - resultDeletes = append(resultDeletes, newTombstoneReader(dels)) + resultDeletes = append(resultDeletes, stones) } return nil @@ -212,7 +208,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { var samples []RefSample - stones := map[uint32]intervals{} + var stones []Stone for j := 0; j < i*10; j++ { samples = append(samples, RefSample{ @@ -224,14 +220,14 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { for j := 0; j < i*20; j++ { ts := rand.Int63() - stones[rand.Uint32()] = intervals{{ts, ts + rand.Int63n(10000)}} + stones = append(stones, Stone{rand.Uint32(), intervals{{ts, ts + rand.Int63n(10000)}}}) } lbls := series[i : i+stepSize] require.NoError(t, w.LogSeries(lbls)) require.NoError(t, w.LogSamples(samples)) - require.NoError(t, w.LogDeletes(newTombstoneReader(stones))) + require.NoError(t, w.LogDeletes(stones)) if len(lbls) > 0 { recordedSeries = append(recordedSeries, lbls) @@ -241,8 +237,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { totalSamples += len(samples) } if len(stones) > 0 { - tr := newTombstoneReader(stones) - recordedDeletes = append(recordedDeletes, tr) + recordedDeletes = append(recordedDeletes, stones) } } @@ -350,7 +345,7 @@ func TestWALRestoreCorrupted(t *testing.T) { require.Equal(t, 0, len(l)) return nil } - delf := func([]stone) error { return nil } + delf := func([]Stone) error { return nil } // Weird hack to check order of reads. i := 0