From e5ce2bef43af82d5943d8eeda3613c1c98397cc9 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 13 Nov 2017 12:16:58 +0100 Subject: [PATCH 1/7] Add explicit error to Querier.Select This has been a frequent source of debugging pain since errors are potentially delayed to a much later point. They bubble up in an unrelated execution path. --- block.go | 5 +++- db_test.go | 35 ++++++++++++++------------ head.go | 5 +++- head_test.go | 3 ++- postings.go | 5 ++++ querier.go | 65 +++++++++++++++++++++++++++++++------------------ querier_test.go | 8 +++--- 7 files changed, 81 insertions(+), 45 deletions(-) diff --git a/block.go b/block.go index 034f336730..f2fcf97ff2 100644 --- a/block.go +++ b/block.go @@ -285,7 +285,10 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { } pr := newPostingsReader(pb.indexr) - p, absent := pr.Select(ms...) + p, absent, err := pr.Select(ms...) + if err != nil { + return errors.Wrap(err, "select series") + } ir := pb.indexr diff --git a/db_test.go b/db_test.go index 9f91705e7e..8eb676af8f 100644 --- a/db_test.go +++ b/db_test.go @@ -36,8 +36,11 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { return db, func() { os.RemoveAll(tmpdir) } } -// Convert a SeriesSet into a form useable with reflect.DeepEqual. -func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { +// query runs a matcher query against the querier and fully expands its data. +func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample { + ss, err := q.Select(matchers...) + Ok(t, err) + result := map[string][]sample{} for ss.Next() { @@ -49,12 +52,12 @@ func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { t, v := it.At() samples = append(samples, sample{t: t, v: v}) } - require.NoError(t, it.Err()) + Ok(t, it.Err()) name := series.Labels().String() result[name] = samples } - require.NoError(t, ss.Err()) + Ok(t, ss.Err()) return result } @@ -70,7 +73,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { querier, err := db.Querier(0, 1) require.NoError(t, err) - seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{}) require.NoError(t, querier.Close()) @@ -82,7 +85,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.NoError(t, err) defer querier.Close() - seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) } @@ -102,7 +105,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { require.NoError(t, err) defer querier.Close() - seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{}) } @@ -146,7 +149,7 @@ func TestDBAppenderAddRef(t *testing.T) { q, err := db.Querier(0, 200) require.NoError(t, err) - res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) + res := query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.FromStrings("a", "b").String(): []sample{ @@ -198,7 +201,8 @@ Outer: q, err := db.Querier(0, numSamples) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -294,8 +298,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q, err := db.Querier(0, 10) require.NoError(t, err) - ss := q.Select(labels.NewEqualMatcher("a", "b")) - ssMap := readSeriesSet(t, ss) + ssMap := query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, @@ -314,8 +317,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q, err = db.Querier(0, 10) require.NoError(t, err) - ss = q.Select(labels.NewEqualMatcher("a", "b")) - ssMap = readSeriesSet(t, ss) + ssMap = query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, @@ -352,7 +354,9 @@ func TestDB_Snapshot(t *testing.T) { defer querier.Close() // sum values - seriesSet := querier.Select(labels.NewEqualMatcher("foo", "bar")) + seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + require.NoError(t, err) + sum := 0.0 for seriesSet.Next() { series := seriesSet.At().Iterator() @@ -500,7 +504,8 @@ func TestDB_e2e(t *testing.T) { q, err := db.Querier(mint, maxt) require.NoError(t, err) - ss := q.Select(qry.ms...) + ss, err := q.Select(qry.ms...) + require.NoError(t, err) result := map[string][]sample{} diff --git a/head.go b/head.go index bd22148d16..51a7896a2c 100644 --- a/head.go +++ b/head.go @@ -575,7 +575,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) pr := newPostingsReader(ir) - p, absent := pr.Select(ms...) + p, absent, err := pr.Select(ms...) + if err != nil { + return errors.Wrap(err, "select series") + } var stones []Stone diff --git a/head_test.go b/head_test.go index a265d5b343..277519ad43 100644 --- a/head_test.go +++ b/head_test.go @@ -328,7 +328,8 @@ Outer: // Compare the result. q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { diff --git a/postings.go b/postings.go index 2647f4dd8e..63fb1e31a0 100644 --- a/postings.go +++ b/postings.go @@ -165,6 +165,11 @@ func (e errPostings) Err() error { return e.err } var emptyPostings = errPostings{} +// EmptyPostings returns a postings list that's always empty. +func EmptyPostings() Postings { + return emptyPostings +} + // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { diff --git a/querier.go b/querier.go index ed8b64ceac..89a9547952 100644 --- a/querier.go +++ b/querier.go @@ -27,7 +27,7 @@ import ( // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...labels.Matcher) SeriesSet + Select(...labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(string) ([]string, error) @@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } -func (q *querier) Select(ms ...labels.Matcher) SeriesSet { +func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) } -func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { +func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { if len(qs) == 0 { - return nopSeriesSet{} + return EmptySeriesSet(), nil } if len(qs) == 1 { return qs[0].Select(ms...) } l := len(qs) / 2 - return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms)) + + a, err := q.sel(qs[:l], ms) + if err != nil { + return nil, err + } + b, err := q.sel(qs[l:], ms) + if err != nil { + return nil, err + } + return newMergedSeriesSet(a, b), nil } func (q *querier) Close() error { @@ -141,10 +150,13 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { +func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { pr := newPostingsReader(q.index) - p, absent := pr.Select(ms...) + p, absent, err := pr.Select(ms...) + if err != nil { + return nil, err + } return &blockSeriesSet{ set: &populatedChunkSeries{ @@ -162,7 +174,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, - } + }, nil } func (q *blockQuerier) LabelValues(name string) ([]string, error) { @@ -205,7 +217,7 @@ func newPostingsReader(i IndexReader) *postingsReader { return &postingsReader{index: i} } -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { +func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -217,12 +229,16 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { absent = append(absent, m.Name()) continue } - its = append(its, r.selectSingle(m)) + it, err := r.selectSingle(m) + if err != nil { + return nil, nil, err + } + its = append(its, it) } p := Intersect(its...) - return r.index.SortedPostings(p), absent + return r.index.SortedPostings(p), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -256,33 +272,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) Postings { +func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { it, err := r.index.Postings(em.Name(), em.Value()) if err != nil { - return errPostings{err: err} + return nil, err } - return it + return it, nil } tpls, err := r.index.LabelValues(m.Name()) if err != nil { - return errPostings{err: err} + return nil, err } var res []string if pm, ok := m.(*labels.PrefixMatcher); ok { res, err = tuplesByPrefix(pm, tpls) if err != nil { - return errPostings{err: err} + return nil, err } } else { for i := 0; i < tpls.Len(); i++ { vals, err := tpls.At(i) if err != nil { - return errPostings{err: err} + return nil, err } if m.Matches(vals[0]) { res = append(res, vals[0]) @@ -291,7 +307,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { } if len(res) == 0 { - return emptyPostings + return EmptyPostings(), nil } var rit []Postings @@ -299,12 +315,12 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { for _, v := range res { it, err := r.index.Postings(m.Name(), v) if err != nil { - return errPostings{err: err} + return nil, err } rit = append(rit, it) } - return Merge(rit...) + return Merge(rit...), nil } func mergeStrings(a, b []string) []string { @@ -342,11 +358,12 @@ type SeriesSet interface { Err() error } -type nopSeriesSet struct{} +var emptySeriesSet = errSeriesSet{} -func (nopSeriesSet) Next() bool { return false } -func (nopSeriesSet) At() Series { return nil } -func (nopSeriesSet) Err() error { return nil } +// EmptySeriesSet returns a series set that's always empty. +func EmptySeriesSet() SeriesSet { + return emptySeriesSet +} // mergedSeriesSet takes two series sets as a single series set. The input series sets // must be sorted and sequential in time, i.e. if they have the same label set, diff --git a/querier_test.go b/querier_test.go index b6f95bf7ce..e302e29838 100644 --- a/querier_test.go +++ b/querier_test.go @@ -460,7 +460,8 @@ Outer: maxt: c.maxt, } - res := querier.Select(c.ms...) + res, err := querier.Select(c.ms...) + require.NoError(t, err) for { eok, rok := c.exp.Next(), res.Next() @@ -632,7 +633,8 @@ Outer: maxt: c.maxt, } - res := querier.Select(c.ms...) + res, err := querier.Select(c.ms...) + require.NoError(t, err) for { eok, rok := c.exp.Next(), res.Next() @@ -1228,7 +1230,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) { sel = func(sets []SeriesSet) SeriesSet { if len(sets) == 0 { - return nopSeriesSet{} + return EmptySeriesSet() } if len(sets) == 1 { return sets[0] From 3ef4326114868a97c1c1d9f82bbc56010e6faf57 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 13 Nov 2017 13:32:24 +0100 Subject: [PATCH 2/7] Refactor tombstone reader types --- block.go | 24 +++++++++++--------- compact.go | 8 +++++-- head.go | 4 ++-- head_test.go | 2 +- querier.go | 7 +++++- querier_test.go | 18 +++++++-------- tombstones.go | 56 +++++++++++++++++++++++++++++----------------- tombstones_test.go | 8 +++---- 8 files changed, 77 insertions(+), 50 deletions(-) diff --git a/block.go b/block.go index f2fcf97ff2..59a9d4dc18 100644 --- a/block.go +++ b/block.go @@ -142,10 +142,9 @@ type Block struct { dir string meta BlockMeta - chunkr ChunkReader - indexr IndexReader - - tombstones tombstoneReader + chunkr ChunkReader + indexr IndexReader + tombstones TombstoneReader } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -293,7 +292,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := map[uint64]Intervals{} + stones := memTombstones{} var lset labels.Labels var chks []ChunkMeta @@ -325,16 +324,21 @@ Outer: return p.Err() } - // Merge the current and new tombstones. - for k, v := range stones { - pb.tombstones.add(k, v[0]) + err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + for _, iv := range ivs { + stones.add(id, iv) + pb.meta.Stats.NumTombstones++ + } + return nil + }) + if err != nil { + return err } + pb.tombstones = stones if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { return err } - - pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones)) return writeMetaFile(pb.dir, &pb.meta) } diff --git a/compact.go b/compact.go index 955ba3cafb..48d77f72c8 100644 --- a/compact.go +++ b/compact.go @@ -418,7 +418,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -631,7 +631,11 @@ func (c *compactionSeriesSet) Next() bool { } var err error - c.intervals = c.tombstones.Get(c.p.At()) + c.intervals, err = c.tombstones.Get(c.p.At()) + if err != nil { + c.err = errors.Wrap(err, "get tombstones") + return false + } if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { c.err = errors.Wrapf(err, "get series %d", c.p.At()) diff --git a/head.go b/head.go index 51a7896a2c..d8614add7e 100644 --- a/head.go +++ b/head.go @@ -66,7 +66,7 @@ type Head struct { postings *memPostings // postings lists for terms - tombstones tombstoneReader + tombstones memTombstones } type headMetrics struct { @@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: newUnorderedMemPostings(), - tombstones: newEmptyTombstoneReader(), + tombstones: memTombstones{}, } h.metrics = newHeadMetrics(h, r) diff --git a/head_test.go b/head_test.go index 277519ad43..c42c19b5e5 100644 --- a/head_test.go +++ b/head_test.go @@ -318,7 +318,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = newEmptyTombstoneReader() + head.tombstones = memTombstones{} // Delete the ranges. for _, r := range c.intervals { diff --git a/querier.go b/querier.go index 89a9547952..faa29ecda8 100644 --- a/querier.go +++ b/querier.go @@ -465,6 +465,7 @@ func (s *baseChunkSeries) Next() bool { var ( lset labels.Labels chunks []ChunkMeta + err error ) Outer: for s.p.Next() { @@ -487,7 +488,11 @@ Outer: s.lset = lset s.chks = chunks - s.intervals = s.tombstones.Get(s.p.At()) + s.intervals, err = s.tombstones.Get(s.p.At()) + if err != nil { + s.err = errors.Wrap(err, "get tombstones") + return false + } if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. diff --git a/querier_test.go b/querier_test.go index e302e29838..301a2b0467 100644 --- a/querier_test.go +++ b/querier_test.go @@ -454,7 +454,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: newEmptyTombstoneReader(), + tombstones: EmptyTombstoneReader(), mint: c.mint, maxt: c.maxt, @@ -506,7 +506,7 @@ func TestBlockQuerierDelete(t *testing.T) { chunks [][]sample } - tombstones tombstoneReader + tombstones TombstoneReader queries []query }{ data: []struct { @@ -554,13 +554,11 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: newTombstoneReader( - map[uint64]Intervals{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, - 3: Intervals{{6, 10}}, - }, - ), + tombstones: memTombstones{ + 1: Intervals{{1, 3}}, + 2: Intervals{{1, 3}, {6, 10}}, + 3: Intervals{{6, 10}}, + }, queries: []query{ { @@ -736,7 +734,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: newListPostings(tc.postings), index: mi, - tombstones: newEmptyTombstoneReader(), + tombstones: EmptyTombstoneReader(), } i := 0 diff --git a/tombstones.go b/tombstones.go index d43cd0bd0c..8ca089e617 100644 --- a/tombstones.go +++ b/tombstones.go @@ -35,12 +35,17 @@ const ( // TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { - Get(ref uint64) Intervals + // Get returns deletion intervals for the series with the given reference. + Get(ref uint64) (Intervals, error) + // Iter calls the given function for each encountered interval. + Iter(func(uint64, Intervals) error) error + + // Close any underlying resources Close() error } -func writeTombstoneFile(dir string, tr tombstoneReader) error { +func writeTombstoneFile(dir string, tr TombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() @@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { mw := io.MultiWriter(f, hash) - for k, v := range tr { - for _, itv := range v { + tr.Iter(func(ref uint64, ivs Intervals) error { + for _, iv := range ivs { buf.reset() - buf.putUvarint64(k) - buf.putVarint64(itv.Mint) - buf.putVarint64(itv.Maxt) + + buf.putUvarint64(ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) _, err = mw.Write(buf.get()) if err != nil { return err } } - } + return nil + }) _, err = f.Write(hash.Sum(nil)) if err != nil { @@ -100,7 +107,7 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (tombstoneReader, error) { +func readTombstones(dir string) (memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if err != nil { return nil, err @@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, errors.New("checksum did not match") } - stonesMap := newEmptyTombstoneReader() + stonesMap := memTombstones{} + for d.len() > 0 { k := d.uvarint64() mint := d.varint64() @@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) { stonesMap.add(k, Interval{mint, maxt}) } - return newTombstoneReader(stonesMap), nil + return stonesMap, nil } -type tombstoneReader map[uint64]Intervals +type memTombstones map[uint64]Intervals -func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { - return tombstoneReader(ts) +var emptyTombstoneReader = memTombstones{} + +// EmptyTombstoneReader returns a TombstoneReader that is always empty. +func EmptyTombstoneReader() TombstoneReader { + return emptyTombstoneReader } -func newEmptyTombstoneReader() tombstoneReader { - return tombstoneReader(make(map[uint64]Intervals)) +func (t memTombstones) Get(ref uint64) (Intervals, error) { + return t[ref], nil } -func (t tombstoneReader) Get(ref uint64) Intervals { - return t[ref] +func (t memTombstones) Iter(f func(uint64, Intervals) error) error { + for ref, ivs := range t { + if err := f(ref, ivs); err != nil { + return err + } + } + return nil } -func (t tombstoneReader) add(ref uint64, itv Interval) { +func (t memTombstones) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } -func (tombstoneReader) Close() error { +func (memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index 9265b76b38..eb124fc941 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -29,7 +29,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := make(map[uint64]Intervals) + stones := memTombstones{} // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -43,13 +43,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) { stones[ref] = dranges } - require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) + require.NoError(t, writeTombstoneFile(tmpdir, stones)) restr, err := readTombstones(tmpdir) require.NoError(t, err) - exptr := newTombstoneReader(stones) + // Compare the two readers. - require.Equal(t, exptr, restr) + require.Equal(t, stones, restr) } func TestAddingNewIntervals(t *testing.T) { From f1512a368acb6e596a219f7f5322cd59bfbd4ecf Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 13 Nov 2017 13:57:10 +0100 Subject: [PATCH 3/7] Expose ChunkSeriesSet and lookups methods. --- block.go | 3 +-- compact.go | 13 +++-------- head.go | 3 +-- querier.go | 66 +++++++++++++++++++++++++++--------------------------- 4 files changed, 38 insertions(+), 47 deletions(-) diff --git a/block.go b/block.go index 59a9d4dc18..8456cb3762 100644 --- a/block.go +++ b/block.go @@ -283,8 +283,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { return ErrClosing } - pr := newPostingsReader(pb.indexr) - p, absent, err := pr.Select(ms...) + p, absent, err := PostingsForMatchers(pb.indexr, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/compact.go b/compact.go index 48d77f72c8..2dc939bc1e 100644 --- a/compact.go +++ b/compact.go @@ -453,7 +453,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { var ( - set compactionSet + set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) @@ -597,18 +597,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil } -type compactionSet interface { - Next() bool - At() (labels.Labels, []ChunkMeta, Intervals) - Err() error -} - type compactionSeriesSet struct { p Postings index IndexReader chunks ChunkReader tombstones TombstoneReader - series SeriesSet l labels.Labels c []ChunkMeta @@ -679,7 +672,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { } type compactionMerger struct { - a, b compactionSet + a, b ChunkSeriesSet aok, bok bool l labels.Labels @@ -692,7 +685,7 @@ type compactionSeries struct { chunks []*ChunkMeta } -func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { +func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/head.go b/head.go index d8614add7e..d149cb1d9c 100644 --- a/head.go +++ b/head.go @@ -574,8 +574,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) - pr := newPostingsReader(ir) - p, absent, err := pr.Select(ms...) + p, absent, err := PostingsForMatchers(ir, ms...) if err != nil { return errors.Wrap(err, "select series") } diff --git a/querier.go b/querier.go index faa29ecda8..37672c7156 100644 --- a/querier.go +++ b/querier.go @@ -151,22 +151,13 @@ type blockQuerier struct { } func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { - pr := newPostingsReader(q.index) - - p, absent, err := pr.Select(ms...) + base, err := LookupChunkSeries(q.index, q.tombstones, ms...) if err != nil { return nil, err } - return &blockSeriesSet{ set: &populatedChunkSeries{ - set: &baseChunkSeries{ - p: p, - index: q.index, - absent: absent, - - tombstones: q.tombstones, - }, + set: base, chunks: q.chunks, mint: q.mint, maxt: q.maxt, @@ -208,16 +199,10 @@ func (q *blockQuerier) Close() error { return merr.Err() } -// postingsReader is used to select matching postings from an IndexReader. -type postingsReader struct { - index IndexReader -} - -func newPostingsReader(i IndexReader) *postingsReader { - return &postingsReader{index: i} -} - -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) { +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. It returns a list of label names that must be manually +// checked to not exist in series the postings list points to. +func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -229,16 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error absent = append(absent, m.Name()) continue } - it, err := r.selectSingle(m) + it, err := postingsForMatcher(index, m) if err != nil { return nil, nil, err } its = append(its, it) } - - p := Intersect(its...) - - return r.index.SortedPostings(p), absent, nil + return index.SortedPostings(Intersect(its...)), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -272,17 +254,17 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { +func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - it, err := r.index.Postings(em.Name(), em.Value()) + it, err := index.Postings(em.Name(), em.Value()) if err != nil { return nil, err } return it, nil } - tpls, err := r.index.LabelValues(m.Name()) + tpls, err := index.LabelValues(m.Name()) if err != nil { return nil, err } @@ -313,7 +295,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) { var rit []Postings for _, v := range res { - it, err := r.index.Postings(m.Name(), v) + it, err := index.Postings(m.Name(), v) if err != nil { return nil, err } @@ -435,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool { return true } -type chunkSeriesSet interface { +type ChunkSeriesSet interface { Next() bool At() (labels.Labels, []ChunkMeta, Intervals) Err() error @@ -455,6 +437,24 @@ type baseChunkSeries struct { err error } +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. +func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { + if tr == nil { + tr = EmptyTombstoneReader() + } + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return nil, err + } + return &baseChunkSeries{ + p: p, + index: ir, + tombstones: tr, + absent: absent, + }, nil +} + func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { return s.lset, s.chks, s.intervals } @@ -518,7 +518,7 @@ Outer: // with known chunk references. It filters out chunks that do not fit the // given time range. type populatedChunkSeries struct { - set chunkSeriesSet + set ChunkSeriesSet chunks ChunkReader mint, maxt int64 @@ -575,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - set chunkSeriesSet + set ChunkSeriesSet err error cur Series From a031cf7424079c17746314bc29e3ff907d3b97ba Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 14 Nov 2017 15:25:30 +0100 Subject: [PATCH 4/7] Return ULID in Compactor --- compact.go | 6 +++--- db.go | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/compact.go b/compact.go index 2dc939bc1e..35cb36a63c 100644 --- a/compact.go +++ b/compact.go @@ -52,7 +52,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) error + Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -321,7 +321,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { return c.write(dest, compactBlockMetas(uid, metas...), blocks...) } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -333,7 +333,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} - return c.write(dest, meta, b) + return uid, c.write(dest, meta, b) } // instrumentedChunkWriter is used for level 1 compactions to record statistics diff --git a/db.go b/db.go index f1972f00bc..4e458dea04 100644 --- a/db.go +++ b/db.go @@ -344,7 +344,7 @@ func (db *DB) compact() (changes bool, err error) { mint: mint, maxt: maxt, } - if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -615,7 +615,8 @@ func (db *DB) Snapshot(dir string) error { return errors.Wrap(err, "error snapshotting headblock") } } - return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + return errors.Wrap(err, "snapshot head block") } // Querier returns a new querier over the data partition for the given time range. From 1fc94a02d12a5ed976993ea8ad33f4f5fd554143 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 21 Nov 2017 16:45:02 +0530 Subject: [PATCH 5/7] Don't retry failed compactions. Fixes prometheus/prometheus#3487 Signed-off-by: Goutham Veeramachaneni --- block.go | 6 +++ block_test.go | 40 ++++++++++++++++++++ compact.go | 36 ++++++++++++++++-- compact_test.go | 98 +++++++++++++++++++++++++++++++++++++++++++------ db_test.go | 8 ++-- 5 files changed, 170 insertions(+), 18 deletions(-) diff --git a/block.go b/block.go index 034f336730..a904008afd 100644 --- a/block.go +++ b/block.go @@ -77,6 +77,7 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + Failed bool `json:"failed,omitempty"` } const ( @@ -245,6 +246,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) { return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil } +func (pb *Block) setCompactionFailed() error { + pb.meta.Compaction.Failed = true + return writeMetaFile(pb.dir, &pb.meta) +} + type blockIndexReader struct { IndexReader b *Block diff --git a/block_test.go b/block_test.go index e75d4ac3f2..e7b6625284 100644 --- a/block_test.go +++ b/block_test.go @@ -12,3 +12,43 @@ // limitations under the License. package tsdb + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestSetCompactionFailed(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test-tsdb") + Ok(t, err) + + b := createEmptyBlock(t, tmpdir) + + Equals(t, false, b.meta.Compaction.Failed) + Ok(t, b.setCompactionFailed()) + Equals(t, true, b.meta.Compaction.Failed) + Ok(t, b.Close()) + + b, err = OpenBlock(tmpdir, nil) + Ok(t, err) + Equals(t, true, b.meta.Compaction.Failed) +} + +func createEmptyBlock(t *testing.T, dir string) *Block { + Ok(t, os.MkdirAll(dir, 0777)) + + Ok(t, writeMetaFile(dir, &BlockMeta{})) + + ir, err := newIndexWriter(dir) + Ok(t, err) + Ok(t, ir.Close()) + + Ok(t, os.MkdirAll(chunkDir(dir), 0777)) + + Ok(t, writeTombstoneFile(dir, newEmptyTombstoneReader())) + + b, err := OpenBlock(dir, nil) + Ok(t, err) + return b +} diff --git a/compact.go b/compact.go index 955ba3cafb..62868d38c9 100644 --- a/compact.go +++ b/compact.go @@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { continue } + Outer: for _, p := range parts { + // Donot select the range if it has a block whose compaction failed. + for _, dm := range p { + if dm.meta.Compaction.Failed { + continue Outer + } + } + mint := p[0].meta.MinTime maxt := p[len(p)-1].meta.MaxTime // Pick the range of blocks if it spans the full range (potentially with gaps) @@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var blocks []BlockReader + var bs []*Block var metas []*BlockMeta for _, d := range dirs { @@ -313,12 +322,27 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { metas = append(metas, meta) blocks = append(blocks, b) + bs = append(bs, b) } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(dest, compactBlockMetas(uid, metas...), blocks...) + err = c.write(dest, compactBlockMetas(uid, metas...), blocks...) + if err == nil { + return nil + } + + var merr MultiError + merr.Add(err) + + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } + } + + return merr } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { @@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) + dir := filepath.Join(dest, meta.ULID.String()) + tmp := dir + ".tmp" + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() + // TODO(gouthamve): Handle error how? + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + ".tmp" - if err = os.RemoveAll(tmp); err != nil { return err } diff --git a/compact_test.go b/compact_test.go index d1650cd16a..a8ae48fce0 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,8 +14,13 @@ package tsdb import ( + "io/ioutil" + "os" + "path/filepath" "testing" + "github.com/go-kit/kit/log" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -157,17 +162,6 @@ func TestLeveledCompactor_plan(t *testing.T) { }, nil) require.NoError(t, err) - metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta { - meta := &BlockMeta{MinTime: mint, MaxTime: maxt} - if stats != nil { - meta.Stats = *stats - } - return dirMeta{ - dir: name, - meta: meta, - } - } - cases := []struct { metas []dirMeta expected []string @@ -274,3 +268,85 @@ func TestLeveledCompactor_plan(t *testing.T) { require.Equal(t, c.expected, res, "test case %d", i) } } + +func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, nil, []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + Ok(t, err) + + cases := []struct { + metas []dirMeta + }{ + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + }, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 60, 80, nil), + }, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 120, 180, nil), + }, + }, + } + + for _, c := range cases { + c.metas[1].meta.Compaction.Failed = true + res, err := compactor.plan(c.metas) + Ok(t, err) + + Equals(t, []string(nil), res) + } +} + +func TestCompactionFailWillCleanUpTempDir(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + Ok(t, err) + + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) + + NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) + _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp") + Assert(t, os.IsNotExist(err), "directory is not cleaned up") +} + +func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { + meta := &BlockMeta{MinTime: mint, MaxTime: maxt} + if stats != nil { + meta.Stats = *stats + } + return dirMeta{ + dir: name, + meta: meta, + } +} + +type erringBReader struct{} + +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } diff --git a/db_test.go b/db_test.go index 9f91705e7e..4be2d889e0 100644 --- a/db_test.go +++ b/db_test.go @@ -27,9 +27,10 @@ import ( ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { - tmpdir, _ := ioutil.TempDir("", "test") + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) - db, err := Open(tmpdir, nil, nil, opts) + db, err = Open(tmpdir, nil, nil, opts) require.NoError(t, err) // Do not close the test database by default as it will deadlock on test failures. @@ -526,7 +527,8 @@ func TestDB_e2e(t *testing.T) { } func TestWALFlushedOnDBClose(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) defer os.RemoveAll(tmpdir) db, err := Open(tmpdir, nil, nil, nil) From 41fd9c66ef5f5b90de1feb5ca5281e618e1c98f3 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Thu, 23 Nov 2017 18:57:10 +0530 Subject: [PATCH 6/7] Close the retention blocks before deleting them. Signed-off-by: Goutham Veeramachaneni --- db.go | 36 +++++++++++++++++++--------------- db_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 16 deletions(-) diff --git a/db.go b/db.go index f1972f00bc..c6c31cd2cc 100644 --- a/db.go +++ b/db.go @@ -277,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) { } db.mtx.RLock() - defer db.mtx.RUnlock() + blocks := db.blocks[:] + db.mtx.RUnlock() - if len(db.blocks) == 0 { + if len(blocks) == 0 { return false, nil } - last := db.blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + last := blocks[len(db.blocks)-1] - return retentionCutoff(db.dir, mint) + mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + dirs, err := retentionCutoffDirs(db.dir, mint) + if err != nil { + return false, err + } + + // This will close the dirs and then delete the dirs. + return len(dirs) > 0, db.reload(dirs...) } // Appender opens a new appender against the database. @@ -388,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoff deletes all directories of blocks in dir that are strictly +// retentionCutoffDirs returns all directories of blocks in dir that are strictly // before mint. -func retentionCutoff(dir string, mint int64) (bool, error) { +func retentionCutoffDirs(dir string, mint int64) ([]string, error) { df, err := fileutil.OpenDir(dir) if err != nil { - return false, errors.Wrapf(err, "open directory") + return nil, errors.Wrapf(err, "open directory") } defer df.Close() dirs, err := blockDirs(dir) if err != nil { - return false, errors.Wrapf(err, "list block dirs %s", dir) + return nil, errors.Wrapf(err, "list block dirs %s", dir) } - changes := false + delDirs := []string{} for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { - return changes, errors.Wrapf(err, "read block meta %s", dir) + return nil, errors.Wrapf(err, "read block meta %s", dir) } // The first block we encounter marks that we crossed the boundary // of deletable blocks. if meta.MaxTime >= mint { break } - changes = true - if err := os.RemoveAll(dir); err != nil { - return changes, err - } + delDirs = append(delDirs, dir) } - return changes, fileutil.Fsync(df) + return delDirs, nil } func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { diff --git a/db_test.go b/db_test.go index 9f91705e7e..da435e98e8 100644 --- a/db_test.go +++ b/db_test.go @@ -551,3 +551,60 @@ func TestWALFlushedOnDBClose(t *testing.T) { require.NoError(t, err) require.Equal(t, values, []string{"labelvalue"}) } + +func TestDB_Retention(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + + lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} + + app := db.Appender() + _, err = app.Add(lbls, 0, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // create snapshot to make it create a block. + // TODO(gouthamve): Add a method to compact headblock. + snap, err := ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + defer os.RemoveAll(snap) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, nil) + require.NoError(t, err) + + Equals(t, 1, len(db.blocks)) + + app = db.Appender() + _, err = app.Add(lbls, 100, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Snapshot again to create another block. + snap, err = ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + defer os.RemoveAll(snap) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, &Options{ + RetentionDuration: 10, + BlockRanges: []int64{50}, + }) + require.NoError(t, err) + + Equals(t, 2, len(db.blocks)) + + // Now call rentention. + changes, err := db.retentionCutoff() + Ok(t, err) + Assert(t, changes, "there should be changes") + Equals(t, 1, len(db.blocks)) + Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. +} From cf7f6108fde56384218570e7d145d9bddbfe2890 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 29 Nov 2017 08:28:55 +0100 Subject: [PATCH 7/7] Order postings lists in index file by key Aligning postings list for similar keys close to each other improves page cache hit rates in typical queries that select postings for multiple label pairs with the same name. --- compact.go | 2 +- postings.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/compact.go b/compact.go index 35cb36a63c..427731e312 100644 --- a/compact.go +++ b/compact.go @@ -589,7 +589,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for l := range postings.m { + for _, l := range postings.sortedKeys() { if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { return errors.Wrap(err, "write postings") } diff --git a/postings.go b/postings.go index 63fb1e31a0..1ebc7c5769 100644 --- a/postings.go +++ b/postings.go @@ -50,6 +50,25 @@ func newUnorderedMemPostings() *memPostings { } } +// sortedKeys returns a list of sorted label keys of the postings. +func (p *memPostings) sortedKeys() []labels.Label { + p.mtx.RLock() + keys := make([]labels.Label, 0, len(p.m)) + + for l := range p.m { + keys = append(keys, l) + } + p.mtx.RUnlock() + + sort.Slice(keys, func(i, j int) bool { + if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 { + return d < 0 + } + return keys[i].Value < keys[j].Value + }) + return keys +} + // Postings returns an iterator over the postings list for s. func (p *memPostings) get(name, value string) Postings { p.mtx.RLock()