From 3ef4326114868a97c1c1d9f82bbc56010e6faf57 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 13 Nov 2017 13:32:24 +0100 Subject: [PATCH 1/3] Refactor tombstone reader types --- block.go | 24 +++++++++++--------- compact.go | 8 +++++-- head.go | 4 ++-- head_test.go | 2 +- querier.go | 7 +++++- querier_test.go | 18 +++++++-------- tombstones.go | 56 +++++++++++++++++++++++++++++----------------- tombstones_test.go | 8 +++---- 8 files changed, 77 insertions(+), 50 deletions(-) diff --git a/block.go b/block.go index f2fcf97ff2..59a9d4dc18 100644 --- a/block.go +++ b/block.go @@ -142,10 +142,9 @@ type Block struct { dir string meta BlockMeta - chunkr ChunkReader - indexr IndexReader - - tombstones tombstoneReader + chunkr ChunkReader + indexr IndexReader + tombstones TombstoneReader } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -293,7 +292,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := map[uint64]Intervals{} + stones := memTombstones{} var lset labels.Labels var chks []ChunkMeta @@ -325,16 +324,21 @@ Outer: return p.Err() } - // Merge the current and new tombstones. - for k, v := range stones { - pb.tombstones.add(k, v[0]) + err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + for _, iv := range ivs { + stones.add(id, iv) + pb.meta.Stats.NumTombstones++ + } + return nil + }) + if err != nil { + return err } + pb.tombstones = stones if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { return err } - - pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones)) return writeMetaFile(pb.dir, &pb.meta) } diff --git a/compact.go b/compact.go index 955ba3cafb..48d77f72c8 100644 --- a/compact.go +++ b/compact.go @@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -631,7 +631,11 @@ func (c *compactionSeriesSet) Next() bool { } var err error - c.intervals = c.tombstones.Get(c.p.At()) + c.intervals, err = c.tombstones.Get(c.p.At()) + if err != nil { + c.err = errors.Wrap(err, "get tombstones") + return false + } if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { c.err = errors.Wrapf(err, "get series %d", c.p.At()) diff --git a/head.go b/head.go index 51a7896a2c..d8614add7e 100644 --- a/head.go +++ b/head.go @@ -66,7 +66,7 @@ type Head struct { postings *memPostings // postings lists for terms - tombstones tombstoneReader + tombstones memTombstones } type headMetrics struct { @@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: newUnorderedMemPostings(), - tombstones: newEmptyTombstoneReader(), + tombstones: memTombstones{}, } h.metrics = newHeadMetrics(h, r) diff --git a/head_test.go b/head_test.go index 277519ad43..c42c19b5e5 100644 --- a/head_test.go +++ b/head_test.go @@ -318,7 +318,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = newEmptyTombstoneReader() + head.tombstones = memTombstones{} // Delete the ranges. for _, r := range c.intervals { diff --git a/querier.go b/querier.go index 89a9547952..faa29ecda8 100644 --- a/querier.go +++ b/querier.go @@ -465,6 +465,7 @@ func (s *baseChunkSeries) Next() bool { var ( lset labels.Labels chunks []ChunkMeta + err error ) Outer: for s.p.Next() { @@ -487,7 +488,11 @@ Outer: s.lset = lset s.chks = chunks - s.intervals = s.tombstones.Get(s.p.At()) + s.intervals, err = s.tombstones.Get(s.p.At()) + if err != nil { + s.err = errors.Wrap(err, "get tombstones") + return false + } if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. diff --git a/querier_test.go b/querier_test.go index e302e29838..301a2b0467 100644 --- a/querier_test.go +++ b/querier_test.go @@ -454,7 +454,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: newEmptyTombstoneReader(), + tombstones: EmptyTombstoneReader(), mint: c.mint, maxt: c.maxt, @@ -506,7 +506,7 @@ func TestBlockQuerierDelete(t *testing.T) { chunks [][]sample } - tombstones tombstoneReader + tombstones TombstoneReader queries []query }{ data: []struct { @@ -554,13 +554,11 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: newTombstoneReader( - map[uint64]Intervals{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, - 3: Intervals{{6, 10}}, - }, - ), + tombstones: memTombstones{ + 1: Intervals{{1, 3}}, + 2: Intervals{{1, 3}, {6, 10}}, + 3: Intervals{{6, 10}}, + }, queries: []query{ { @@ -736,7 +734,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: newListPostings(tc.postings), index: mi, - tombstones: newEmptyTombstoneReader(), + tombstones: EmptyTombstoneReader(), } i := 0 diff --git a/tombstones.go b/tombstones.go index d43cd0bd0c..8ca089e617 100644 --- a/tombstones.go +++ b/tombstones.go @@ -35,12 +35,17 @@ const ( // TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { - Get(ref uint64) Intervals + // Get returns deletion intervals for the series with the given reference. + Get(ref uint64) (Intervals, error) + // Iter calls the given function for each encountered interval. + Iter(func(uint64, Intervals) error) error + + // Close any underlying resources Close() error } -func writeTombstoneFile(dir string, tr tombstoneReader) error { +func writeTombstoneFile(dir string, tr TombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() @@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { mw := io.MultiWriter(f, hash) - for k, v := range tr { - for _, itv := range v { + tr.Iter(func(ref uint64, ivs Intervals) error { + for _, iv := range ivs { buf.reset() - buf.putUvarint64(k) - buf.putVarint64(itv.Mint) - buf.putVarint64(itv.Maxt) + + buf.putUvarint64(ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) _, err = mw.Write(buf.get()) if err != nil { return err } } - } + return nil + }) _, err = f.Write(hash.Sum(nil)) if err != nil { @@ -100,7 +107,7 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (tombstoneReader, error) { +func readTombstones(dir string) (memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if err != nil { return nil, err @@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, errors.New("checksum did not match") } - stonesMap := newEmptyTombstoneReader() + stonesMap := memTombstones{} + for d.len() > 0 { k := d.uvarint64() mint := d.varint64() @@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) { stonesMap.add(k, Interval{mint, maxt}) } - return newTombstoneReader(stonesMap), nil + return stonesMap, nil } -type tombstoneReader map[uint64]Intervals +type memTombstones map[uint64]Intervals -func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { - return tombstoneReader(ts) +var emptyTombstoneReader = memTombstones{} + +// EmptyTombstoneReader returns a TombstoneReader that is always empty. +func EmptyTombstoneReader() TombstoneReader { + return emptyTombstoneReader } -func newEmptyTombstoneReader() tombstoneReader { - return tombstoneReader(make(map[uint64]Intervals)) +func (t memTombstones) Get(ref uint64) (Intervals, error) { + return t[ref], nil } -func (t tombstoneReader) Get(ref uint64) Intervals { - return t[ref] +func (t memTombstones) Iter(f func(uint64, Intervals) error) error { + for ref, ivs := range t { + if err := f(ref, ivs); err != nil { + return err + } + } + return nil } -func (t tombstoneReader) add(ref uint64, itv Interval) { +func (t memTombstones) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } -func (tombstoneReader) Close() error { +func (memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index 9265b76b38..eb124fc941 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -29,7 +29,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := make(map[uint64]Intervals) + stones := memTombstones{} // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -43,13 +43,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) { stones[ref] = dranges } - require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) + require.NoError(t, writeTombstoneFile(tmpdir, stones)) restr, err := readTombstones(tmpdir) require.NoError(t, err) - exptr := newTombstoneReader(stones) + // Compare the two readers. - require.Equal(t, exptr, restr) + require.Equal(t, stones, restr) } func TestAddingNewIntervals(t *testing.T) { From f1512a368acb6e596a219f7f5322cd59bfbd4ecf Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 13 Nov 2017 13:57:10 +0100 Subject: [PATCH 2/3] Expose ChunkSeriesSet and lookups methods. --- block.go | 3 +-- compact.go | 13 +++-------- head.go | 3 +-- querier.go | 66 +++++++++++++++++++++++++++--------------------------- 4 files changed, 38 insertions(+), 47 deletions(-) diff --git a/block.go b/block.go index 59a9d4dc18..8456cb3762 100644 --- a/block.go +++ b/block.go @@ -283,8 +283,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { return ErrClosing } - pr := newPostingsReader(pb.indexr) - p, absent, err := pr.Select(ms...) + p, absent, err := PostingsForMatchers(pb.indexr, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/compact.go b/compact.go index 48d77f72c8..2dc939bc1e 100644 --- a/compact.go +++ b/compact.go @@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { var ( - set compactionSet + set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) @@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil } -type compactionSet interface { - Next() bool - At() (labels.Labels, []ChunkMeta, Intervals) - Err() error -} - type compactionSeriesSet struct { p Postings index IndexReader chunks ChunkReader tombstones TombstoneReader - series SeriesSet l labels.Labels c []ChunkMeta @@ -679,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { } type compactionMerger struct { - a, b compactionSet + a, b ChunkSeriesSet aok, bok bool l labels.Labels @@ -692,7 +685,7 @@ type compactionSeries struct { chunks []*ChunkMeta } -func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { +func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/head.go b/head.go index d8614add7e..d149cb1d9c 100644 --- a/head.go +++ b/head.go @@ -574,8 +574,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) - pr := newPostingsReader(ir) - p, absent, err := pr.Select(ms...) + p, absent, err := PostingsForMatchers(ir, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/querier.go b/querier.go index faa29ecda8..37672c7156 100644 --- a/querier.go +++ b/querier.go @@ -151,22 +151,13 @@ type blockQuerier struct { } func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { - pr := newPostingsReader(q.index) - - p, absent, err := pr.Select(ms...) + base, err := LookupChunkSeries(q.index, q.tombstones, ms...) if err != nil { return nil, err } - return &blockSeriesSet{ set: &populatedChunkSeries{ - set: &baseChunkSeries{ - p: p, - index: q.index, - absent: absent, - - tombstones: q.tombstones, - }, + set: base, chunks: q.chunks, mint: q.mint, maxt: q.maxt, @@ -208,16 +199,10 @@ func (q *blockQuerier) Close() error { return merr.Err() } -// postingsReader is used to select matching postings from an IndexReader. -type postingsReader struct { - index IndexReader -} - -func newPostingsReader(i IndexReader) *postingsReader { - return &postingsReader{index: i} -} - -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) { +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. It returns a list of label names that must be manually +// checked to not exist in series the postings list points to. +func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -229,16 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error absent = append(absent, m.Name()) continue } - it, err := r.selectSingle(m) + it, err := postingsForMatcher(index, m) if err != nil { return nil, nil, err } its = append(its, it) } - - p := Intersect(its...) - - return r.index.SortedPostings(p), absent, nil + return index.SortedPostings(Intersect(its...)), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -272,17 +254,17 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { +func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - it, err := r.index.Postings(em.Name(), em.Value()) + it, err := index.Postings(em.Name(), em.Value()) if err != nil { return nil, err } return it, nil } - tpls, err := r.index.LabelValues(m.Name()) + tpls, err := index.LabelValues(m.Name()) if err != nil { return nil, err } @@ -313,7 +295,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { var rit []Postings for _, v := range res { - it, err := r.index.Postings(m.Name(), v) + it, err := index.Postings(m.Name(), v) if err != nil { return nil, err } @@ -435,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool { return true } -type chunkSeriesSet interface { +type ChunkSeriesSet interface { Next() bool At() (labels.Labels, []ChunkMeta, Intervals) Err() error @@ -455,6 +437,24 @@ type baseChunkSeries struct { err error } +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. +func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { + if tr == nil { + tr = EmptyTombstoneReader() + } + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return nil, err + } + return &baseChunkSeries{ + p: p, + index: ir, + tombstones: tr, + absent: absent, + }, nil +} + func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { return s.lset, s.chks, s.intervals } @@ -518,7 +518,7 @@ Outer: // with known chunk references. It filters out chunks that do not fit the // given time range. type populatedChunkSeries struct { - set chunkSeriesSet + set ChunkSeriesSet chunks ChunkReader mint, maxt int64 @@ -575,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - set chunkSeriesSet + set ChunkSeriesSet err error cur Series From a031cf7424079c17746314bc29e3ff907d3b97ba Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 14 Nov 2017 15:25:30 +0100 Subject: [PATCH 3/3] Return ULID in Compactor --- compact.go | 6 +++--- db.go | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/compact.go b/compact.go index 2dc939bc1e..35cb36a63c 100644 --- a/compact.go +++ b/compact.go @@ -52,7 +52,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) error + Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -321,7 +321,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { return c.write(dest, compactBlockMetas(uid, metas...), blocks...) } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -333,7 +333,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} - return c.write(dest, meta, b) + return uid, c.write(dest, meta, b) } // instrumentedChunkWriter is used for level 1 compactions to record statistics diff --git a/db.go b/db.go index f1972f00bc..4e458dea04 100644 --- a/db.go +++ b/db.go @@ -344,7 +344,7 @@ func (db *DB) compact() (changes bool, err error) { mint: mint, maxt: maxt, } - if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -615,7 +615,8 @@ func (db *DB) Snapshot(dir string) error { return errors.Wrap(err, "error snapshotting headblock") } } - return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + return errors.Wrap(err, "snapshot head block") } // Querier returns a new querier over the data partition for the given time range.