From 6ee254e353c5723de6594c9bb57e6ab8081a5fb7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 20 Sep 2017 18:08:57 +0200 Subject: [PATCH] Ensure postings are always sorted IDs for new series are handed out before the postings are locked. Thus series are not indexed in order of their IDs, which could result in only partially sorted postings list. Iterating over those silently skipped elements as the sort invariant was violated. --- compact.go | 2 +- head.go | 6 ++---- postings.go | 29 ++++++++++++++++++++++++++--- postings_test.go | 16 +++++++++------- 4 files changed, 38 insertions(+), 15 deletions(-) diff --git a/compact.go b/compact.go index a3bc7d17a3..17a4da0418 100644 --- a/compact.go +++ b/compact.go @@ -457,7 +457,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexr := b.Index() - all, err := indexr.Postings("", "") + all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) if err != nil { return err } diff --git a/head.go b/head.go index a5ce94e455..3ead78fc49 100644 --- a/head.go +++ b/head.go @@ -207,7 +207,7 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref) + h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref, "ts", s.T, "mint", mint) continue } _, chunkCreated := ms.append(s.T, s.V) @@ -267,7 +267,7 @@ func (h *Head) Truncate(mint int64) error { start = time.Now() - p, err := h.indexRange(mint, math.MaxInt64).Postings("", "") + p, err := h.indexRange(mint, math.MaxInt64).Postings(allPostingsKey.Name, allPostingsKey.Value) if err != nil { return err } @@ -1038,8 +1038,6 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo return prev, false } s.hashes[i].set(hash, series) - - s.hashes[i][hash] = append(s.hashes[i][hash], series) s.locks[i].Unlock() i = series.ref & stripeMask diff --git a/postings.go b/postings.go index 97a29ab197..0e51b221b3 100644 --- a/postings.go +++ b/postings.go @@ -45,7 +45,7 @@ func (p *memPostings) get(name, value string) Postings { return newListPostings(l) } -var allLabel = labels.Label{} +var allPostingsKey = labels.Label{} // add adds a document to the index. The caller has to ensure that no // term argument appears twice. @@ -53,13 +53,36 @@ func (p *memPostings) add(id uint64, lset labels.Labels) { p.mtx.Lock() for _, l := range lset { - p.m[l] = append(p.m[l], id) + p.addFor(id, l) } - p.m[allLabel] = append(p.m[allLabel], id) + p.addFor(id, allPostingsKey) p.mtx.Unlock() } +func (p *memPostings) addFor(id uint64, l labels.Label) { + list := append(p.m[l], id) + p.m[l] = list + + // There is no guarantee that no higher ID was inserted before as they may + // be generated independently before adding them to postings. + // We repair order violations on insert. The invariant is that the first n-1 + // items in the list are already sorted. + for i := len(list) - 1; i >= 1; i-- { + if list[i] >= list[i-1] { + break + } + list[i], list[i-1] = list[i-1], list[i] + } +} + +func expandPostings(p Postings) (res []uint64, err error) { + for p.Next() { + res = append(res, p.At()) + } + return res, p.Err() +} + // Postings provides iterative access over a postings list. type Postings interface { // Next advances the iterator and returns true if another value was found. diff --git a/postings_test.go b/postings_test.go index 5d726ca3aa..48cd2b6088 100644 --- a/postings_test.go +++ b/postings_test.go @@ -21,6 +21,15 @@ import ( "github.com/stretchr/testify/require" ) +func TestMemPostings_addFor(t *testing.T) { + p := newMemPostings() + p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8} + + p.addFor(5, allPostingsKey) + + require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey]) +} + type mockPostings struct { next func() bool seek func(uint64) bool @@ -33,13 +42,6 @@ func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) } func (m *mockPostings) Value() uint64 { return m.value() } func (m *mockPostings) Err() error { return m.err() } -func expandPostings(p Postings) (res []uint64, err error) { - for p.Next() { - res = append(res, p.At()) - } - return res, p.Err() -} - func TestIntersect(t *testing.T) { var cases = []struct { a, b []uint64