From 96d7f540d4463849df9fb2e7b071f49bdf7afb9c Mon Sep 17 00:00:00 2001
From: Fabian Reinartz <fab.reinartz@gmail.com>
Date: Sat, 5 Aug 2017 13:31:48 +0200
Subject: [PATCH] Persist series without allocating the full set

Change index persistence for series to not be accumulated in memory
before being written as one large batch. `Labels` and `ChunkMeta`
objects are reused.
This cuts down memory spikes during compaction of multiple blocks
significantly.

As part of the the Index{Reader,Writer} now have an explicit notion of
symbols and series must be inserted in order.
---
 Documentation/format/index.md |   2 -
 block.go                      |   9 +-
 chunks/chunk.go               |  10 +-
 chunks/xor.go                 |   3 +-
 compact.go                    |  55 ++++++--
 head.go                       |  87 +++++++-----
 index.go                      | 241 +++++++++++++++++++---------------
 index_test.go                 | 130 ++++++++++--------
 querier.go                    |  17 +--
 querier_test.go               |  12 +-
 10 files changed, 327 insertions(+), 239 deletions(-)

diff --git a/Documentation/format/index.md b/Documentation/format/index.md
index c6e3a444a1..3beebb6fed 100644
--- a/Documentation/format/index.md
+++ b/Documentation/format/index.md
@@ -69,8 +69,6 @@ The file offset to the beginning of a series serves as the series' ID in all sub
 
 ```
 ┌───────────────────────────────────────┐
-│  #series <4b>                         │
-├───────────────────────────────────────┤
 │ ┌───────────────────────────────────┐ │
 │ │   series_1                        │ │
 │ ├───────────────────────────────────┤ │
diff --git a/block.go b/block.go
index 45c8b60cfc..53f8b6c4fb 100644
--- a/block.go
+++ b/block.go
@@ -251,9 +251,12 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
 	// Choose only valid postings which have chunks in the time-range.
 	stones := map[uint32]intervals{}
 
+	var lset labels.Labels
+	var chks []*ChunkMeta
+
 Outer:
 	for p.Next() {
-		lset, chunks, err := ir.Series(p.At())
+		err := ir.Series(p.At(), &lset, &chks)
 		if err != nil {
 			return err
 		}
@@ -264,10 +267,10 @@ Outer:
 			}
 		}
 
-		for _, chk := range chunks {
+		for _, chk := range chks {
 			if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
 				// Delete only until the current vlaues and not beyond.
-				tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime)
+				tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
 				stones[p.At()] = intervals{{tmin, tmax}}
 				continue Outer
 			}
diff --git a/chunks/chunk.go b/chunks/chunk.go
index 4eeb9c5d79..6bed4455fe 100644
--- a/chunks/chunk.go
+++ b/chunks/chunk.go
@@ -13,10 +13,7 @@
 
 package chunks
 
-import (
-	"encoding/binary"
-	"fmt"
-)
+import "fmt"
 
 // Encoding is the identifier for a chunk encoding.
 type Encoding uint8
@@ -50,10 +47,7 @@ type Chunk interface {
 func FromData(e Encoding, d []byte) (Chunk, error) {
 	switch e {
 	case EncXOR:
-		return &XORChunk{
-			b:   &bstream{count: 0, stream: d},
-			num: binary.BigEndian.Uint16(d),
-		}, nil
+		return &XORChunk{b: &bstream{count: 0, stream: d}}, nil
 	}
 	return nil, fmt.Errorf("unknown chunk encoding: %d", e)
 }
diff --git a/chunks/xor.go b/chunks/xor.go
index e9bdef0741..501db704ad 100644
--- a/chunks/xor.go
+++ b/chunks/xor.go
@@ -52,8 +52,7 @@ import (
 
 // XORChunk holds XOR encoded sample data.
 type XORChunk struct {
-	b   *bstream
-	num uint16
+	b *bstream
 }
 
 // NewXORChunk returns a new chunk with XOR encoding of the given size.
diff --git a/compact.go b/compact.go
index e973b3abfd..0027cd50ce 100644
--- a/compact.go
+++ b/compact.go
@@ -398,17 +398,31 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
 // populateBlock fills the index and chunk writers with new data gathered as the union
 // of the provided blocks. It returns meta information for the new block.
 func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
-	var set compactionSet
-	var metas []BlockMeta
-
+	var (
+		set        compactionSet
+		metas      []BlockMeta
+		allSymbols = make(map[string]struct{}, 1<<16)
+	)
 	for i, b := range blocks {
 		metas = append(metas, b.Meta())
 
-		all, err := b.Index().Postings("", "")
+		symbols, err := b.Index().Symbols()
+		if err != nil {
+			return nil, errors.Wrap(err, "read symbols")
+		}
+		for s := range symbols {
+			allSymbols[s] = struct{}{}
+		}
+
+		indexr := b.Index()
+
+		all, err := indexr.Postings("", "")
 		if err != nil {
 			return nil, err
 		}
-		s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
+		all = indexr.SortedPostings(all)
+
+		s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all)
 
 		if i == 0 {
 			set = s
@@ -428,6 +442,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
 		meta     = compactBlockMetas(metas...)
 	)
 
+	if err := indexw.AddSymbols(allSymbols); err != nil {
+		return nil, errors.Wrap(err, "add symbols")
+	}
+
 	for set.Next() {
 		lset, chks, dranges := set.At() // The chunks here are not fully deleted.
 
@@ -461,7 +479,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
 			return nil, err
 		}
 
-		indexw.AddSeries(i, lset, chks...)
+		if err := indexw.AddSeries(i, lset, chks...); err != nil {
+			return nil, errors.Wrapf(err, "add series")
+		}
 
 		meta.Stats.NumChunks += uint64(len(chks))
 		meta.Stats.NumSeries++
@@ -525,6 +545,7 @@ type compactionSeriesSet struct {
 	index      IndexReader
 	chunks     ChunkReader
 	tombstones TombstoneReader
+	series     SeriesSet
 
 	l         labels.Labels
 	c         []*ChunkMeta
@@ -545,11 +566,9 @@ func (c *compactionSeriesSet) Next() bool {
 	if !c.p.Next() {
 		return false
 	}
-
 	c.intervals = c.tombstones.Get(c.p.At())
 
-	c.l, c.c, c.err = c.index.Series(c.p.At())
-	if c.err != nil {
+	if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil {
 		return false
 	}
 
@@ -629,14 +648,24 @@ func (c *compactionMerger) Next() bool {
 	if !c.aok && !c.bok || c.Err() != nil {
 		return false
 	}
+	// While advancing child iterators the memory used for labels and chunks
+	// may be reused. When picking a series we have to store the result.
+	var lset labels.Labels
+	var chks []*ChunkMeta
 
 	d := c.compare()
 	// Both sets contain the current series. Chain them into a single one.
 	if d > 0 {
-		c.l, c.c, c.intervals = c.b.At()
+		lset, chks, c.intervals = c.b.At()
+		c.l = append(c.l[:0], lset...)
+		c.c = append(c.c[:0], chks...)
+
 		c.bok = c.b.Next()
 	} else if d < 0 {
-		c.l, c.c, c.intervals = c.a.At()
+		lset, chks, c.intervals = c.a.At()
+		c.l = append(c.l[:0], lset...)
+		c.c = append(c.c[:0], chks...)
+
 		c.aok = c.a.Next()
 	} else {
 		l, ca, ra := c.a.At()
@@ -645,8 +674,8 @@ func (c *compactionMerger) Next() bool {
 			ra = ra.add(r)
 		}
 
-		c.l = l
-		c.c = append(ca, cb...)
+		c.l = append(c.l[:0], l...)
+		c.c = append(append(c.c[:0], ca...), cb...)
 		c.intervals = ra
 
 		c.aok = c.a.Next()
diff --git a/head.go b/head.go
index cb8e7329a8..9e99d3777d 100644
--- a/head.go
+++ b/head.go
@@ -67,6 +67,7 @@ type HeadBlock struct {
 	// to their chunk descs.
 	hashes map[uint64][]*memSeries
 
+	symbols  map[string]struct{}
 	values   map[string]stringset // label names to possible values
 	postings *memPostings         // postings lists for terms
 
@@ -117,6 +118,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
 		series:     []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
 		hashes:     map[uint64][]*memSeries{},
 		values:     map[string]stringset{},
+		symbols:    map[string]struct{}{},
 		postings:   &memPostings{m: make(map[term][]uint32)},
 		meta:       *meta,
 		tombstones: newEmptyTombstoneReader(),
@@ -332,7 +334,12 @@ func (h *HeadBlock) Snapshot(snapshotDir string) error {
 func (h *HeadBlock) Dir() string { return h.dir }
 
 // Index returns an IndexReader against the block.
-func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
+func (h *HeadBlock) Index() IndexReader {
+	h.mtx.RLock()
+	defer h.mtx.RUnlock()
+
+	return &headIndexReader{HeadBlock: h, maxSeries: uint32(len(h.series) - 1)}
+}
 
 // Chunks returns a ChunkReader against the block.
 func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
@@ -340,14 +347,10 @@ func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
 // Querier returns a new Querier against the block for the range [mint, maxt].
 func (h *HeadBlock) Querier(mint, maxt int64) Querier {
 	h.mtx.RLock()
-	defer h.mtx.RUnlock()
-
 	if h.closed {
 		panic(fmt.Sprintf("block %s already closed", h.dir))
 	}
-
-	// Reference on the original slice to use for postings mapping.
-	series := h.series[:]
+	h.mtx.RUnlock()
 
 	return &blockQuerier{
 		mint:       mint,
@@ -355,27 +358,6 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
 		index:      h.Index(),
 		chunks:     h.Chunks(),
 		tombstones: h.Tombstones(),
-
-		postingsMapper: func(p Postings) Postings {
-			ep := make([]uint32, 0, 64)
-
-			for p.Next() {
-				// Skip posting entries that include series added after we
-				// instantiated the querier.
-				if int(p.At()) >= len(series) {
-					break
-				}
-				ep = append(ep, p.At())
-			}
-			if err := p.Err(); err != nil {
-				return errPostings{err: errors.Wrap(err, "expand postings")}
-			}
-
-			sort.Slice(ep, func(i, j int) bool {
-				return labels.Compare(series[ep[i]].lset, series[ep[j]].lset) < 0
-			})
-			return newListPostings(ep)
-		},
 	}
 }
 
@@ -661,6 +643,12 @@ func (c *safeChunk) Iterator() chunks.Iterator {
 
 type headIndexReader struct {
 	*HeadBlock
+	// Highest series that existed when the index reader was instantiated.
+	maxSeries uint32
+}
+
+func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
+	return h.symbols, nil
 }
 
 // LabelValues returns the possible label values
@@ -689,33 +677,59 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) {
 	return h.postings.get(term{name: name, value: value}), nil
 }
 
-// Series returns the series for the given reference.
-func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
+func (h *headIndexReader) SortedPostings(p Postings) Postings {
 	h.mtx.RLock()
 	defer h.mtx.RUnlock()
 
-	if int(ref) >= len(h.series) {
-		return nil, nil, ErrNotFound
+	ep := make([]uint32, 0, 1024)
+
+	for p.Next() {
+		// Skip posting entries that include series added after we
+		// instantiated the index reader.
+		if p.At() > h.maxSeries {
+			break
+		}
+		ep = append(ep, p.At())
+	}
+	if err := p.Err(); err != nil {
+		return errPostings{err: errors.Wrap(err, "expand postings")}
+	}
+
+	sort.Slice(ep, func(i, j int) bool {
+		return labels.Compare(h.series[ep[i]].lset, h.series[ep[j]].lset) < 0
+	})
+	return newListPostings(ep)
+}
+
+// Series returns the series for the given reference.
+func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
+	h.mtx.RLock()
+	defer h.mtx.RUnlock()
+
+	if ref > h.maxSeries {
+		return ErrNotFound
 	}
 
 	s := h.series[ref]
 	if s == nil {
-		return nil, nil, ErrNotFound
+		return ErrNotFound
 	}
-	metas := make([]*ChunkMeta, 0, len(s.chunks))
+	*lbls = append((*lbls)[:0], s.lset...)
 
 	s.mtx.RLock()
 	defer s.mtx.RUnlock()
 
+	*chks = (*chks)[:0]
+
 	for i, c := range s.chunks {
-		metas = append(metas, &ChunkMeta{
+		*chks = append(*chks, &ChunkMeta{
 			MinTime: c.minTime,
 			MaxTime: c.maxTime,
 			Ref:     (uint64(ref) << 32) | uint64(i),
 		})
 	}
 
-	return s.lset, metas, nil
+	return nil
 }
 
 func (h *headIndexReader) LabelIndices() ([][]string, error) {
@@ -760,6 +774,9 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
 		valset.set(l.Value)
 
 		h.postings.add(s.ref, term{name: l.Name, value: l.Value})
+
+		h.symbols[l.Name] = struct{}{}
+		h.symbols[l.Value] = struct{}{}
 	}
 
 	h.postings.add(s.ref, term{})
diff --git a/index.go b/index.go
index 3264d9263e..c948ee27c2 100644
--- a/index.go
+++ b/index.go
@@ -61,7 +61,9 @@ func (s indexWriterSeriesSlice) Less(i, j int) bool {
 type indexWriterStage uint8
 
 const (
-	idxStagePopulate indexWriterStage = iota
+	idxStageNone indexWriterStage = iota
+	idxStageSymbols
+	idxStageSeries
 	idxStageLabelIndex
 	idxStagePostings
 	idxStageDone
@@ -69,8 +71,12 @@ const (
 
 func (s indexWriterStage) String() string {
 	switch s {
-	case idxStagePopulate:
-		return "populate"
+	case idxStageNone:
+		return "none"
+	case idxStageSymbols:
+		return "symbols"
+	case idxStageSeries:
+		return "series"
 	case idxStageLabelIndex:
 		return "label index"
 	case idxStagePostings:
@@ -82,12 +88,18 @@ func (s indexWriterStage) String() string {
 }
 
 // IndexWriter serializes the index for a block of series data.
-// The methods must generally be called in the order they are specified in.
+// The methods must be called in the order they are specified in.
 type IndexWriter interface {
+	// AddSymbols registers all string symbols that are encountered in series
+	// and other indices.
+	AddSymbols(sym map[string]struct{}) error
+
 	// AddSeries populates the index writer with a series and its offsets
 	// of chunks that the index can reference.
-	// The reference number is used to resolve a series against the postings
-	// list iterator. It only has to be available during the write processing.
+	// Implementations may require series to be insert in increasing order by
+	// their labels.
+	// The reference numbers are used to resolve entries in postings lists that
+	// are added later.
 	AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error
 
 	// WriteLabelIndex serializes an index from label names to values.
@@ -118,10 +130,13 @@ type indexWriter struct {
 	buf2    encbuf
 	uint32s []uint32
 
-	series       map[uint32]*indexWriterSeries
-	symbols      map[string]uint32 // symbol offsets
-	labelIndexes []hashEntry       // label index offsets
-	postings     []hashEntry       // postings lists offsets
+	symbols       map[string]uint32 // symbol offsets
+	seriesOffsets map[uint32]uint64 // offsets of series
+	labelIndexes  []hashEntry       // label index offsets
+	postings      []hashEntry       // postings lists offsets
+
+	// Hold last series to validate that clients insert new series in order.
+	lastSeries labels.Labels
 
 	crc32 hash.Hash
 }
@@ -152,7 +167,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
 		f:     f,
 		fbuf:  bufio.NewWriterSize(f, 1<<22),
 		pos:   0,
-		stage: idxStagePopulate,
+		stage: idxStageNone,
 
 		// Reusable memory.
 		buf1:    encbuf{b: make([]byte, 0, 1<<22)},
@@ -160,9 +175,9 @@ func newIndexWriter(dir string) (*indexWriter, error) {
 		uint32s: make([]uint32, 0, 1<<15),
 
 		// Caches.
-		symbols: make(map[string]uint32, 1<<13),
-		series:  make(map[uint32]*indexWriterSeries, 1<<16),
-		crc32:   crc32.New(crc32.MakeTable(crc32.Castagnoli)),
+		symbols:       make(map[string]uint32, 1<<13),
+		seriesOffsets: make(map[uint32]uint64, 1<<16),
+		crc32:         crc32.New(crc32.MakeTable(crc32.Castagnoli)),
 	}
 	if err := iw.writeMeta(); err != nil {
 		return nil, err
@@ -207,20 +222,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
 		return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
 	}
 
-	// Complete population stage by writing symbols and series.
-	if w.stage == idxStagePopulate {
-		w.toc.symbols = w.pos
-		if err := w.writeSymbols(); err != nil {
-			return err
-		}
-		w.toc.series = w.pos
-		if err := w.writeSeries(); err != nil {
-			return err
-		}
-	}
-
 	// Mark start of sections in table of contents.
 	switch s {
+	case idxStageSymbols:
+		w.toc.symbols = w.pos
+	case idxStageSeries:
+		w.toc.series = w.pos
+
 	case idxStageLabelIndex:
 		w.toc.labelIndices = w.pos
 
@@ -254,26 +262,65 @@ func (w *indexWriter) writeMeta() error {
 }
 
 func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
-	if _, ok := w.series[ref]; ok {
-		return errors.Errorf("series with reference %d already added", ref)
+	if err := w.ensureStage(idxStageSeries); err != nil {
+		return err
 	}
-	// Populate the symbol table from all label sets we have to reference.
-	for _, l := range lset {
-		w.symbols[l.Name] = 0
-		w.symbols[l.Value] = 0
+	if labels.Compare(lset, w.lastSeries) <= 0 {
+		return errors.Errorf("out-of-order series added with label set %q", lset)
 	}
 
-	w.series[ref] = &indexWriterSeries{
-		labels: lset,
-		chunks: chunks,
+	if _, ok := w.seriesOffsets[ref]; ok {
+		return errors.Errorf("series with reference %d already added", ref)
 	}
+	w.seriesOffsets[ref] = w.pos
+
+	w.buf2.reset()
+	w.buf2.putUvarint(len(lset))
+
+	for _, l := range lset {
+		offset, ok := w.symbols[l.Name]
+		if !ok {
+			return errors.Errorf("symbol entry for %q does not exist", l.Name)
+		}
+		w.buf2.putUvarint32(offset)
+
+		offset, ok = w.symbols[l.Value]
+		if !ok {
+			return errors.Errorf("symbol entry for %q does not exist", l.Value)
+		}
+		w.buf2.putUvarint32(offset)
+	}
+
+	w.buf2.putUvarint(len(chunks))
+
+	for _, c := range chunks {
+		w.buf2.putVarint64(c.MinTime)
+		w.buf2.putVarint64(c.MaxTime)
+		w.buf2.putUvarint64(c.Ref)
+	}
+
+	w.buf1.reset()
+	w.buf1.putUvarint(w.buf2.len())
+
+	w.buf2.putHash(w.crc32)
+
+	if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
+		return errors.Wrap(err, "write series data")
+	}
+
+	w.lastSeries = append(w.lastSeries[:0], lset...)
+
 	return nil
 }
 
-func (w *indexWriter) writeSymbols() error {
+func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
+	if err := w.ensureStage(idxStageSymbols); err != nil {
+		return err
+	}
 	// Generate sorted list of strings we will store as reference table.
-	symbols := make([]string, 0, len(w.symbols))
-	for s := range w.symbols {
+	symbols := make([]string, 0, len(sym))
+
+	for s := range sym {
 		symbols = append(symbols, s)
 	}
 	sort.Strings(symbols)
@@ -285,12 +332,14 @@ func (w *indexWriter) writeSymbols() error {
 
 	w.buf2.putBE32int(len(symbols))
 
+	w.symbols = make(map[string]uint32, len(symbols))
+
 	for _, s := range symbols {
 		w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
 
 		// NOTE: len(s) gives the number of runes, not the number of bytes.
 		// Therefore the read-back length for strings with unicode characters will
-		// be off when not using putCstr.
+		// be off when not using putUvarintStr.
 		w.buf2.putUvarintStr(s)
 	}
 
@@ -301,55 +350,6 @@ func (w *indexWriter) writeSymbols() error {
 	return errors.Wrap(err, "write symbols")
 }
 
-func (w *indexWriter) writeSeries() error {
-	// Series must be stored sorted along their labels.
-	series := make(indexWriterSeriesSlice, 0, len(w.series))
-
-	for _, s := range w.series {
-		series = append(series, s)
-	}
-	sort.Sort(series)
-
-	// Header holds number of series.
-	w.buf1.reset()
-	w.buf1.putBE32int(len(series))
-
-	if err := w.write(w.buf1.get()); err != nil {
-		return errors.Wrap(err, "write series count")
-	}
-
-	for _, s := range series {
-		s.offset = uint32(w.pos)
-
-		w.buf2.reset()
-		w.buf2.putUvarint(len(s.labels))
-
-		for _, l := range s.labels {
-			w.buf2.putUvarint32(w.symbols[l.Name])
-			w.buf2.putUvarint32(w.symbols[l.Value])
-		}
-
-		w.buf2.putUvarint(len(s.chunks))
-
-		for _, c := range s.chunks {
-			w.buf2.putVarint64(c.MinTime)
-			w.buf2.putVarint64(c.MaxTime)
-			w.buf2.putUvarint64(c.Ref)
-		}
-
-		w.buf1.reset()
-		w.buf1.putUvarint(w.buf2.len())
-
-		w.buf2.putHash(w.crc32)
-
-		if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
-			return errors.Wrap(err, "write series data")
-		}
-	}
-
-	return nil
-}
-
 func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
 	if len(values)%len(names) != 0 {
 		return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
@@ -379,7 +379,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
 	w.buf2.putBE32int(valt.Len())
 
 	for _, v := range valt.s {
-		w.buf2.putBE32(w.symbols[v])
+		offset, ok := w.symbols[v]
+		if !ok {
+			return errors.Errorf("symbol entry for %q does not exist", v)
+		}
+		w.buf2.putBE32(offset)
 	}
 
 	w.buf1.reset()
@@ -450,11 +454,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
 	refs := w.uint32s[:0]
 
 	for it.Next() {
-		s, ok := w.series[it.At()]
+		offset, ok := w.seriesOffsets[it.At()]
 		if !ok {
-			return errors.Errorf("series for reference %d not found", it.At())
+			return errors.Errorf("%p series for reference %d not found", w, it.At())
 		}
-		refs = append(refs, s.offset)
+		refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out.
 	}
 	if err := it.Err(); err != nil {
 		return err
@@ -503,6 +507,10 @@ func (w *indexWriter) Close() error {
 
 // IndexReader provides reading access of serialized index data.
 type IndexReader interface {
+	// Symbols returns a set of string symbols that may occur in series' labels
+	// and indices.
+	Symbols() (map[string]struct{}, error)
+
 	// LabelValues returns the possible label values
 	LabelValues(names ...string) (StringTuples, error)
 
@@ -510,8 +518,13 @@ type IndexReader interface {
 	// The Postings here contain the offsets to the series inside the index.
 	Postings(name, value string) (Postings, error)
 
-	// Series returns the series for the given reference.
-	Series(ref uint32) (labels.Labels, []*ChunkMeta, error)
+	// SortedPostings returns a postings list that is reordered to be sorted
+	// by the label set of the underlying series.
+	SortedPostings(Postings) Postings
+
+	// Series populates the given labels and chunk metas for the series identified
+	// by the reference.
+	Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error
 
 	// LabelIndices returns the label pairs for which indices exist.
 	LabelIndices() ([][]string, error)
@@ -664,6 +677,21 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) {
 	return s, nil
 }
 
+func (r *indexReader) Symbols() (map[string]struct{}, error) {
+	d1 := r.decbufAt(int(r.toc.symbols))
+	d2 := d1.decbuf(d1.be32int())
+
+	count := d2.be32int()
+	sym := make(map[string]struct{}, count)
+
+	for ; count > 0; count-- {
+		s := d2.uvarintStr()
+		sym[s] = struct{}{}
+	}
+
+	return sym, d2.err()
+}
+
 func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
 	const sep = "\xff"
 
@@ -712,36 +740,37 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
 	return res, nil
 }
 
-func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
+func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
 	d1 := r.decbufAt(int(ref))
 	d2 := d1.decbuf(int(d1.uvarint()))
 
+	*lbls = (*lbls)[:0]
+	*chks = (*chks)[:0]
+
 	k := int(d2.uvarint())
-	lbls := make(labels.Labels, 0, k)
 
 	for i := 0; i < k; i++ {
 		lno := uint32(d2.uvarint())
 		lvo := uint32(d2.uvarint())
 
 		if d2.err() != nil {
-			return nil, nil, errors.Wrap(d2.err(), "read series label offsets")
+			return errors.Wrap(d2.err(), "read series label offsets")
 		}
 
 		ln, err := r.lookupSymbol(lno)
 		if err != nil {
-			return nil, nil, errors.Wrap(err, "lookup label name")
+			return errors.Wrap(err, "lookup label name")
 		}
 		lv, err := r.lookupSymbol(lvo)
 		if err != nil {
-			return nil, nil, errors.Wrap(err, "lookup label value")
+			return errors.Wrap(err, "lookup label value")
 		}
 
-		lbls = append(lbls, labels.Label{Name: ln, Value: lv})
+		*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
 	}
 
 	// Read the chunks meta data.
 	k = int(d2.uvarint())
-	chunks := make([]*ChunkMeta, 0, k)
 
 	for i := 0; i < k; i++ {
 		mint := d2.varint64()
@@ -749,10 +778,10 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
 		off := d2.uvarint64()
 
 		if d2.err() != nil {
-			return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i)
+			return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
 		}
 
-		chunks = append(chunks, &ChunkMeta{
+		*chks = append(*chks, &ChunkMeta{
 			Ref:     off,
 			MinTime: mint,
 			MaxTime: maxt,
@@ -761,7 +790,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
 
 	// TODO(fabxc): verify CRC32.
 
-	return lbls, chunks, nil
+	return nil
 }
 
 func (r *indexReader) Postings(name, value string) (Postings, error) {
@@ -787,6 +816,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
 	return newBigEndianPostings(d2.get()), nil
 }
 
+func (r *indexReader) SortedPostings(p Postings) Postings {
+	return p
+}
+
 type stringTuples struct {
 	l int      // tuple length
 	s []string // flattened tuple entries
diff --git a/index_test.go b/index_test.go
index b38350dcf7..3616daf300 100644
--- a/index_test.go
+++ b/index_test.go
@@ -35,21 +35,31 @@ type series struct {
 type mockIndex struct {
 	series     map[uint32]series
 	labelIndex map[string][]string
-	postings   map[labels.Label]Postings
+	postings   *memPostings
+	symbols    map[string]struct{}
 }
 
 func newMockIndex() mockIndex {
 	return mockIndex{
 		series:     make(map[uint32]series),
 		labelIndex: make(map[string][]string),
-		postings:   make(map[labels.Label]Postings),
+		postings:   &memPostings{m: make(map[term][]uint32)},
+		symbols:    make(map[string]struct{}),
 	}
 }
 
+func (m mockIndex) Symbols() (map[string]struct{}, error) {
+	return m.symbols, nil
+}
+
 func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error {
 	if _, ok := m.series[ref]; ok {
 		return errors.Errorf("series with reference %d already added", ref)
 	}
+	for _, lbl := range l {
+		m.symbols[lbl.Name] = struct{}{}
+		m.symbols[lbl.Value] = struct{}{}
+	}
 
 	s := series{l: l}
 	// Actual chunk data is not stored in the index.
@@ -75,41 +85,16 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
 }
 
 func (m mockIndex) WritePostings(name, value string, it Postings) error {
-	lbl := labels.Label{
-		Name:  name,
-		Value: value,
+	if _, ok := m.postings.m[term{name, value}]; ok {
+		return errors.Errorf("postings for %s=%q already added", name, value)
 	}
-
-	type refdSeries struct {
-		ref    uint32
-		series series
-	}
-
-	// Re-Order so that the list is ordered by labels of the series.
-	// Internally that is how the series are laid out.
-	refs := make([]refdSeries, 0)
-	for it.Next() {
-		s, ok := m.series[it.At()]
-		if !ok {
-			return errors.Errorf("series for reference %d not found", it.At())
-		}
-		refs = append(refs, refdSeries{it.At(), s})
-	}
-	if err := it.Err(); err != nil {
+	ep, err := expandPostings(it)
+	if err != nil {
 		return err
 	}
+	m.postings.m[term{name, value}] = ep
 
-	sort.Slice(refs, func(i, j int) bool {
-		return labels.Compare(refs[i].series.l, refs[j].series.l) < 0
-	})
-
-	postings := make([]uint32, 0, len(refs))
-	for _, r := range refs {
-		postings = append(postings, r.ref)
-	}
-
-	m.postings[lbl] = newListPostings(postings)
-	return nil
+	return it.Err()
 }
 
 func (m mockIndex) Close() error {
@@ -126,26 +111,30 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) {
 }
 
 func (m mockIndex) Postings(name, value string) (Postings, error) {
-	lbl := labels.Label{
-		Name:  name,
-		Value: value,
-	}
-
-	p, ok := m.postings[lbl]
-	if !ok {
-		return nil, ErrNotFound
-	}
-
-	return p, nil
+	return m.postings.get(term{name, value}), nil
 }
 
-func (m mockIndex) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
-	s, ok := m.series[ref]
-	if !ok {
-		return nil, nil, ErrNotFound
+func (m mockIndex) SortedPostings(p Postings) Postings {
+	ep, err := expandPostings(p)
+	if err != nil {
+		return errPostings{err: errors.Wrap(err, "expand postings")}
 	}
 
-	return s.l, s.chunks, nil
+	sort.Slice(ep, func(i, j int) bool {
+		return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0
+	})
+	return newListPostings(ep)
+}
+
+func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error {
+	s, ok := m.series[ref]
+	if !ok {
+		return ErrNotFound
+	}
+	*lset = append((*lset)[:0], s.l...)
+	*chks = append((*chks)[:0], s.chunks...)
+
+	return nil
 }
 
 func (m mockIndex) LabelIndices() ([][]string, error) {
@@ -197,11 +186,21 @@ func TestIndexRW_Postings(t *testing.T) {
 		labels.FromStrings("a", "1", "b", "4"),
 	}
 
+	err = iw.AddSymbols(map[string]struct{}{
+		"a": struct{}{},
+		"b": struct{}{},
+		"1": struct{}{},
+		"2": struct{}{},
+		"3": struct{}{},
+		"4": struct{}{},
+	})
+	require.NoError(t, err)
+
 	// Postings lists are only written if a series with the respective
 	// reference was added before.
 	require.NoError(t, iw.AddSeries(1, series[0]))
-	require.NoError(t, iw.AddSeries(3, series[2]))
 	require.NoError(t, iw.AddSeries(2, series[1]))
+	require.NoError(t, iw.AddSeries(3, series[2]))
 	require.NoError(t, iw.AddSeries(4, series[3]))
 
 	err = iw.WritePostings("a", "1", newListPostings([]uint32{1, 2, 3, 4}))
@@ -215,8 +214,11 @@ func TestIndexRW_Postings(t *testing.T) {
 	p, err := ir.Postings("a", "1")
 	require.NoError(t, err)
 
+	var l labels.Labels
+	var c []*ChunkMeta
+
 	for i := 0; p.Next(); i++ {
-		l, c, err := ir.Series(p.At())
+		err := ir.Series(p.At(), &l, &c)
 
 		require.NoError(t, err)
 		require.Equal(t, 0, len(c))
@@ -235,6 +237,17 @@ func TestPersistence_index_e2e(t *testing.T) {
 	lbls, err := readPrometheusLabels("testdata/20k.series", 20000)
 	require.NoError(t, err)
 
+	// Sort labels as the index writer expects series in sorted order.
+	sort.Sort(labels.Slice(lbls))
+
+	symbols := map[string]struct{}{}
+	for _, lset := range lbls {
+		for _, l := range lset {
+			symbols[l.Name] = struct{}{}
+			symbols[l.Value] = struct{}{}
+		}
+	}
+
 	var input indexWriterSeriesSlice
 
 	// Generate ChunkMetas for every label set.
@@ -258,6 +271,8 @@ func TestPersistence_index_e2e(t *testing.T) {
 	iw, err := newIndexWriter(dir)
 	require.NoError(t, err)
 
+	require.NoError(t, iw.AddSymbols(symbols))
+
 	// Population procedure as done by compaction.
 	var (
 		postings = &memPostings{m: make(map[term][]uint32, 512)}
@@ -311,21 +326,24 @@ func TestPersistence_index_e2e(t *testing.T) {
 	ir, err := newIndexReader(dir)
 	require.NoError(t, err)
 
-	for p := range mi.postings {
-		gotp, err := ir.Postings(p.Name, p.Value)
+	for p := range mi.postings.m {
+		gotp, err := ir.Postings(p.name, p.value)
 		require.NoError(t, err)
 
-		expp, err := mi.Postings(p.Name, p.Value)
+		expp, err := mi.Postings(p.name, p.value)
+
+		var lset, explset labels.Labels
+		var chks, expchks []*ChunkMeta
 
 		for gotp.Next() {
 			require.True(t, expp.Next())
 
 			ref := gotp.At()
 
-			lset, chks, err := ir.Series(ref)
+			err := ir.Series(ref, &lset, &chks)
 			require.NoError(t, err)
 
-			explset, expchks, err := mi.Series(expp.At())
+			err = mi.Series(expp.At(), &explset, &expchks)
 			require.Equal(t, explset, lset)
 			require.Equal(t, expchks, chks)
 		}
diff --git a/querier.go b/querier.go
index 9533683d33..a54acdd5a1 100644
--- a/querier.go
+++ b/querier.go
@@ -15,7 +15,7 @@ package tsdb
 
 import (
 	"fmt"
-        "sort"
+	"sort"
 	"strings"
 
 	"github.com/prometheus/tsdb/chunks"
@@ -134,8 +134,6 @@ type blockQuerier struct {
 	chunks     ChunkReader
 	tombstones TombstoneReader
 
-	postingsMapper func(Postings) Postings
-
 	mint, maxt int64
 }
 
@@ -144,10 +142,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
 
 	p, absent := pr.Select(ms...)
 
-	if q.postingsMapper != nil {
-		p = q.postingsMapper(p)
-	}
-
 	return &blockSeriesSet{
 		set: &populatedChunkSeries{
 			set: &baseChunkSeries{
@@ -218,7 +212,7 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
 
 	p := Intersect(its...)
 
-	return p, absent
+	return r.index.SortedPostings(p), absent
 }
 
 // tuplesByPrefix uses binary search to find prefix matches within ts.
@@ -434,11 +428,14 @@ func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
 func (s *baseChunkSeries) Err() error { return s.err }
 
 func (s *baseChunkSeries) Next() bool {
+	var (
+		lset   labels.Labels
+		chunks []*ChunkMeta
+	)
 Outer:
 	for s.p.Next() {
 		ref := s.p.At()
-		lset, chunks, err := s.index.Series(ref)
-		if err != nil {
+		if err := s.index.Series(ref, &lset, &chunks); err != nil {
 			s.err = err
 			return false
 		}
diff --git a/querier_test.go b/querier_test.go
index 53d9040877..d8e8f76574 100644
--- a/querier_test.go
+++ b/querier_test.go
@@ -432,18 +432,18 @@ func TestBlockQuerier(t *testing.T) {
 				maxt: 6,
 				ms:   []labels.Matcher{labels.NewPrefixMatcher("p", "abc")},
 				exp: newListSeriesSet([]Series{
-					newSeries(map[string]string{
-						"p": "abcd",
-						"x": "xyz",
-					},
-						[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
-					),
 					newSeries(map[string]string{
 						"a": "ab",
 						"p": "abce",
 					},
 						[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}},
 					),
+					newSeries(map[string]string{
+						"p": "abcd",
+						"x": "xyz",
+					},
+						[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
+					),
 				}),
 			},
 		},