diff --git a/Documentation/format/index.md b/Documentation/format/index.md index c6e3a444a..3beebb6fe 100644 --- a/Documentation/format/index.md +++ b/Documentation/format/index.md @@ -69,8 +69,6 @@ The file offset to the beginning of a series serves as the series' ID in all sub ``` ┌───────────────────────────────────────┐ -│ #series <4b> │ -├───────────────────────────────────────┤ │ ┌───────────────────────────────────┐ │ │ │ series_1 │ │ │ ├───────────────────────────────────┤ │ diff --git a/block.go b/block.go index 45c8b60cf..53f8b6c4f 100644 --- a/block.go +++ b/block.go @@ -251,9 +251,12 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { // Choose only valid postings which have chunks in the time-range. stones := map[uint32]intervals{} + var lset labels.Labels + var chks []*ChunkMeta + Outer: for p.Next() { - lset, chunks, err := ir.Series(p.At()) + err := ir.Series(p.At(), &lset, &chks) if err != nil { return err } @@ -264,10 +267,10 @@ Outer: } } - for _, chk := range chunks { + for _, chk := range chks { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { // Delete only until the current vlaues and not beyond. - tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime) + tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) stones[p.At()] = intervals{{tmin, tmax}} continue Outer } diff --git a/chunks/chunk.go b/chunks/chunk.go index 4eeb9c5d7..6bed4455f 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -13,10 +13,7 @@ package chunks -import ( - "encoding/binary" - "fmt" -) +import "fmt" // Encoding is the identifier for a chunk encoding. type Encoding uint8 @@ -50,10 +47,7 @@ type Chunk interface { func FromData(e Encoding, d []byte) (Chunk, error) { switch e { case EncXOR: - return &XORChunk{ - b: &bstream{count: 0, stream: d}, - num: binary.BigEndian.Uint16(d), - }, nil + return &XORChunk{b: &bstream{count: 0, stream: d}}, nil } return nil, fmt.Errorf("unknown chunk encoding: %d", e) } diff --git a/chunks/xor.go b/chunks/xor.go index e9bdef074..501db704a 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -52,8 +52,7 @@ import ( // XORChunk holds XOR encoded sample data. type XORChunk struct { - b *bstream - num uint16 + b *bstream } // NewXORChunk returns a new chunk with XOR encoding of the given size. diff --git a/compact.go b/compact.go index e973b3abf..0027cd50c 100644 --- a/compact.go +++ b/compact.go @@ -398,17 +398,31 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { - var set compactionSet - var metas []BlockMeta - + var ( + set compactionSet + metas []BlockMeta + allSymbols = make(map[string]struct{}, 1<<16) + ) for i, b := range blocks { metas = append(metas, b.Meta()) - all, err := b.Index().Postings("", "") + symbols, err := b.Index().Symbols() + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + for s := range symbols { + allSymbols[s] = struct{}{} + } + + indexr := b.Index() + + all, err := indexr.Postings("", "") if err != nil { return nil, err } - s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all) + all = indexr.SortedPostings(all) + + s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all) if i == 0 { set = s @@ -428,6 +442,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo meta = compactBlockMetas(metas...) ) + if err := indexw.AddSymbols(allSymbols); err != nil { + return nil, errors.Wrap(err, "add symbols") + } + for set.Next() { lset, chks, dranges := set.At() // The chunks here are not fully deleted. @@ -461,7 +479,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo return nil, err } - indexw.AddSeries(i, lset, chks...) + if err := indexw.AddSeries(i, lset, chks...); err != nil { + return nil, errors.Wrapf(err, "add series") + } meta.Stats.NumChunks += uint64(len(chks)) meta.Stats.NumSeries++ @@ -525,6 +545,7 @@ type compactionSeriesSet struct { index IndexReader chunks ChunkReader tombstones TombstoneReader + series SeriesSet l labels.Labels c []*ChunkMeta @@ -545,11 +566,9 @@ func (c *compactionSeriesSet) Next() bool { if !c.p.Next() { return false } - c.intervals = c.tombstones.Get(c.p.At()) - c.l, c.c, c.err = c.index.Series(c.p.At()) - if c.err != nil { + if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil { return false } @@ -629,14 +648,24 @@ func (c *compactionMerger) Next() bool { if !c.aok && !c.bok || c.Err() != nil { return false } + // While advancing child iterators the memory used for labels and chunks + // may be reused. When picking a series we have to store the result. + var lset labels.Labels + var chks []*ChunkMeta d := c.compare() // Both sets contain the current series. Chain them into a single one. if d > 0 { - c.l, c.c, c.intervals = c.b.At() + lset, chks, c.intervals = c.b.At() + c.l = append(c.l[:0], lset...) + c.c = append(c.c[:0], chks...) + c.bok = c.b.Next() } else if d < 0 { - c.l, c.c, c.intervals = c.a.At() + lset, chks, c.intervals = c.a.At() + c.l = append(c.l[:0], lset...) + c.c = append(c.c[:0], chks...) + c.aok = c.a.Next() } else { l, ca, ra := c.a.At() @@ -645,8 +674,8 @@ func (c *compactionMerger) Next() bool { ra = ra.add(r) } - c.l = l - c.c = append(ca, cb...) + c.l = append(c.l[:0], l...) + c.c = append(append(c.c[:0], ca...), cb...) c.intervals = ra c.aok = c.a.Next() diff --git a/head.go b/head.go index cb8e7329a..9e99d3777 100644 --- a/head.go +++ b/head.go @@ -67,6 +67,7 @@ type HeadBlock struct { // to their chunk descs. hashes map[uint64][]*memSeries + symbols map[string]struct{} values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -117,6 +118,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, + symbols: map[string]struct{}{}, postings: &memPostings{m: make(map[term][]uint32)}, meta: *meta, tombstones: newEmptyTombstoneReader(), @@ -332,7 +334,12 @@ func (h *HeadBlock) Snapshot(snapshotDir string) error { func (h *HeadBlock) Dir() string { return h.dir } // Index returns an IndexReader against the block. -func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *HeadBlock) Index() IndexReader { + h.mtx.RLock() + defer h.mtx.RUnlock() + + return &headIndexReader{HeadBlock: h, maxSeries: uint32(len(h.series) - 1)} +} // Chunks returns a ChunkReader against the block. func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } @@ -340,14 +347,10 @@ func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } // Querier returns a new Querier against the block for the range [mint, maxt]. func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() - defer h.mtx.RUnlock() - if h.closed { panic(fmt.Sprintf("block %s already closed", h.dir)) } - - // Reference on the original slice to use for postings mapping. - series := h.series[:] + h.mtx.RUnlock() return &blockQuerier{ mint: mint, @@ -355,27 +358,6 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { index: h.Index(), chunks: h.Chunks(), tombstones: h.Tombstones(), - - postingsMapper: func(p Postings) Postings { - ep := make([]uint32, 0, 64) - - for p.Next() { - // Skip posting entries that include series added after we - // instantiated the querier. - if int(p.At()) >= len(series) { - break - } - ep = append(ep, p.At()) - } - if err := p.Err(); err != nil { - return errPostings{err: errors.Wrap(err, "expand postings")} - } - - sort.Slice(ep, func(i, j int) bool { - return labels.Compare(series[ep[i]].lset, series[ep[j]].lset) < 0 - }) - return newListPostings(ep) - }, } } @@ -661,6 +643,12 @@ func (c *safeChunk) Iterator() chunks.Iterator { type headIndexReader struct { *HeadBlock + // Highest series that existed when the index reader was instantiated. + maxSeries uint32 +} + +func (h *headIndexReader) Symbols() (map[string]struct{}, error) { + return h.symbols, nil } // LabelValues returns the possible label values @@ -689,33 +677,59 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) { return h.postings.get(term{name: name, value: value}), nil } -// Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { +func (h *headIndexReader) SortedPostings(p Postings) Postings { h.mtx.RLock() defer h.mtx.RUnlock() - if int(ref) >= len(h.series) { - return nil, nil, ErrNotFound + ep := make([]uint32, 0, 1024) + + for p.Next() { + // Skip posting entries that include series added after we + // instantiated the index reader. + if p.At() > h.maxSeries { + break + } + ep = append(ep, p.At()) + } + if err := p.Err(); err != nil { + return errPostings{err: errors.Wrap(err, "expand postings")} + } + + sort.Slice(ep, func(i, j int) bool { + return labels.Compare(h.series[ep[i]].lset, h.series[ep[j]].lset) < 0 + }) + return newListPostings(ep) +} + +// Series returns the series for the given reference. +func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { + h.mtx.RLock() + defer h.mtx.RUnlock() + + if ref > h.maxSeries { + return ErrNotFound } s := h.series[ref] if s == nil { - return nil, nil, ErrNotFound + return ErrNotFound } - metas := make([]*ChunkMeta, 0, len(s.chunks)) + *lbls = append((*lbls)[:0], s.lset...) s.mtx.RLock() defer s.mtx.RUnlock() + *chks = (*chks)[:0] + for i, c := range s.chunks { - metas = append(metas, &ChunkMeta{ + *chks = append(*chks, &ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: (uint64(ref) << 32) | uint64(i), }) } - return s.lset, metas, nil + return nil } func (h *headIndexReader) LabelIndices() ([][]string, error) { @@ -760,6 +774,9 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { valset.set(l.Value) h.postings.add(s.ref, term{name: l.Name, value: l.Value}) + + h.symbols[l.Name] = struct{}{} + h.symbols[l.Value] = struct{}{} } h.postings.add(s.ref, term{}) diff --git a/index.go b/index.go index 3264d9263..c948ee27c 100644 --- a/index.go +++ b/index.go @@ -61,7 +61,9 @@ func (s indexWriterSeriesSlice) Less(i, j int) bool { type indexWriterStage uint8 const ( - idxStagePopulate indexWriterStage = iota + idxStageNone indexWriterStage = iota + idxStageSymbols + idxStageSeries idxStageLabelIndex idxStagePostings idxStageDone @@ -69,8 +71,12 @@ const ( func (s indexWriterStage) String() string { switch s { - case idxStagePopulate: - return "populate" + case idxStageNone: + return "none" + case idxStageSymbols: + return "symbols" + case idxStageSeries: + return "series" case idxStageLabelIndex: return "label index" case idxStagePostings: @@ -82,12 +88,18 @@ func (s indexWriterStage) String() string { } // IndexWriter serializes the index for a block of series data. -// The methods must generally be called in the order they are specified in. +// The methods must be called in the order they are specified in. type IndexWriter interface { + // AddSymbols registers all string symbols that are encountered in series + // and other indices. + AddSymbols(sym map[string]struct{}) error + // AddSeries populates the index writer with a series and its offsets // of chunks that the index can reference. - // The reference number is used to resolve a series against the postings - // list iterator. It only has to be available during the write processing. + // Implementations may require series to be insert in increasing order by + // their labels. + // The reference numbers are used to resolve entries in postings lists that + // are added later. AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. @@ -118,10 +130,13 @@ type indexWriter struct { buf2 encbuf uint32s []uint32 - series map[uint32]*indexWriterSeries - symbols map[string]uint32 // symbol offsets - labelIndexes []hashEntry // label index offsets - postings []hashEntry // postings lists offsets + symbols map[string]uint32 // symbol offsets + seriesOffsets map[uint32]uint64 // offsets of series + labelIndexes []hashEntry // label index offsets + postings []hashEntry // postings lists offsets + + // Hold last series to validate that clients insert new series in order. + lastSeries labels.Labels crc32 hash.Hash } @@ -152,7 +167,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, - stage: idxStagePopulate, + stage: idxStageNone, // Reusable memory. buf1: encbuf{b: make([]byte, 0, 1<<22)}, @@ -160,9 +175,9 @@ func newIndexWriter(dir string) (*indexWriter, error) { uint32s: make([]uint32, 0, 1<<15), // Caches. - symbols: make(map[string]uint32, 1<<13), - series: make(map[uint32]*indexWriterSeries, 1<<16), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + symbols: make(map[string]uint32, 1<<13), + seriesOffsets: make(map[uint32]uint64, 1<<16), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } if err := iw.writeMeta(); err != nil { return nil, err @@ -207,20 +222,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } - // Complete population stage by writing symbols and series. - if w.stage == idxStagePopulate { - w.toc.symbols = w.pos - if err := w.writeSymbols(); err != nil { - return err - } - w.toc.series = w.pos - if err := w.writeSeries(); err != nil { - return err - } - } - // Mark start of sections in table of contents. switch s { + case idxStageSymbols: + w.toc.symbols = w.pos + case idxStageSeries: + w.toc.series = w.pos + case idxStageLabelIndex: w.toc.labelIndices = w.pos @@ -254,26 +262,65 @@ func (w *indexWriter) writeMeta() error { } func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { - if _, ok := w.series[ref]; ok { - return errors.Errorf("series with reference %d already added", ref) + if err := w.ensureStage(idxStageSeries); err != nil { + return err } - // Populate the symbol table from all label sets we have to reference. - for _, l := range lset { - w.symbols[l.Name] = 0 - w.symbols[l.Value] = 0 + if labels.Compare(lset, w.lastSeries) <= 0 { + return errors.Errorf("out-of-order series added with label set %q", lset) } - w.series[ref] = &indexWriterSeries{ - labels: lset, - chunks: chunks, + if _, ok := w.seriesOffsets[ref]; ok { + return errors.Errorf("series with reference %d already added", ref) } + w.seriesOffsets[ref] = w.pos + + w.buf2.reset() + w.buf2.putUvarint(len(lset)) + + for _, l := range lset { + offset, ok := w.symbols[l.Name] + if !ok { + return errors.Errorf("symbol entry for %q does not exist", l.Name) + } + w.buf2.putUvarint32(offset) + + offset, ok = w.symbols[l.Value] + if !ok { + return errors.Errorf("symbol entry for %q does not exist", l.Value) + } + w.buf2.putUvarint32(offset) + } + + w.buf2.putUvarint(len(chunks)) + + for _, c := range chunks { + w.buf2.putVarint64(c.MinTime) + w.buf2.putVarint64(c.MaxTime) + w.buf2.putUvarint64(c.Ref) + } + + w.buf1.reset() + w.buf1.putUvarint(w.buf2.len()) + + w.buf2.putHash(w.crc32) + + if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { + return errors.Wrap(err, "write series data") + } + + w.lastSeries = append(w.lastSeries[:0], lset...) + return nil } -func (w *indexWriter) writeSymbols() error { +func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { + if err := w.ensureStage(idxStageSymbols); err != nil { + return err + } // Generate sorted list of strings we will store as reference table. - symbols := make([]string, 0, len(w.symbols)) - for s := range w.symbols { + symbols := make([]string, 0, len(sym)) + + for s := range sym { symbols = append(symbols, s) } sort.Strings(symbols) @@ -285,12 +332,14 @@ func (w *indexWriter) writeSymbols() error { w.buf2.putBE32int(len(symbols)) + w.symbols = make(map[string]uint32, len(symbols)) + for _, s := range symbols { w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) // NOTE: len(s) gives the number of runes, not the number of bytes. // Therefore the read-back length for strings with unicode characters will - // be off when not using putCstr. + // be off when not using putUvarintStr. w.buf2.putUvarintStr(s) } @@ -301,55 +350,6 @@ func (w *indexWriter) writeSymbols() error { return errors.Wrap(err, "write symbols") } -func (w *indexWriter) writeSeries() error { - // Series must be stored sorted along their labels. - series := make(indexWriterSeriesSlice, 0, len(w.series)) - - for _, s := range w.series { - series = append(series, s) - } - sort.Sort(series) - - // Header holds number of series. - w.buf1.reset() - w.buf1.putBE32int(len(series)) - - if err := w.write(w.buf1.get()); err != nil { - return errors.Wrap(err, "write series count") - } - - for _, s := range series { - s.offset = uint32(w.pos) - - w.buf2.reset() - w.buf2.putUvarint(len(s.labels)) - - for _, l := range s.labels { - w.buf2.putUvarint32(w.symbols[l.Name]) - w.buf2.putUvarint32(w.symbols[l.Value]) - } - - w.buf2.putUvarint(len(s.chunks)) - - for _, c := range s.chunks { - w.buf2.putVarint64(c.MinTime) - w.buf2.putVarint64(c.MaxTime) - w.buf2.putUvarint64(c.Ref) - } - - w.buf1.reset() - w.buf1.putUvarint(w.buf2.len()) - - w.buf2.putHash(w.crc32) - - if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { - return errors.Wrap(err, "write series data") - } - } - - return nil -} - func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { if len(values)%len(names) != 0 { return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) @@ -379,7 +379,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { w.buf2.putBE32int(valt.Len()) for _, v := range valt.s { - w.buf2.putBE32(w.symbols[v]) + offset, ok := w.symbols[v] + if !ok { + return errors.Errorf("symbol entry for %q does not exist", v) + } + w.buf2.putBE32(offset) } w.buf1.reset() @@ -450,11 +454,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { refs := w.uint32s[:0] for it.Next() { - s, ok := w.series[it.At()] + offset, ok := w.seriesOffsets[it.At()] if !ok { - return errors.Errorf("series for reference %d not found", it.At()) + return errors.Errorf("%p series for reference %d not found", w, it.At()) } - refs = append(refs, s.offset) + refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out. } if err := it.Err(); err != nil { return err @@ -503,6 +507,10 @@ func (w *indexWriter) Close() error { // IndexReader provides reading access of serialized index data. type IndexReader interface { + // Symbols returns a set of string symbols that may occur in series' labels + // and indices. + Symbols() (map[string]struct{}, error) + // LabelValues returns the possible label values LabelValues(names ...string) (StringTuples, error) @@ -510,8 +518,13 @@ type IndexReader interface { // The Postings here contain the offsets to the series inside the index. Postings(name, value string) (Postings, error) - // Series returns the series for the given reference. - Series(ref uint32) (labels.Labels, []*ChunkMeta, error) + // SortedPostings returns a postings list that is reordered to be sorted + // by the label set of the underlying series. + SortedPostings(Postings) Postings + + // Series populates the given labels and chunk metas for the series identified + // by the reference. + Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) @@ -664,6 +677,21 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { return s, nil } +func (r *indexReader) Symbols() (map[string]struct{}, error) { + d1 := r.decbufAt(int(r.toc.symbols)) + d2 := d1.decbuf(d1.be32int()) + + count := d2.be32int() + sym := make(map[string]struct{}, count) + + for ; count > 0; count-- { + s := d2.uvarintStr() + sym[s] = struct{}{} + } + + return sym, d2.err() +} + func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { const sep = "\xff" @@ -712,36 +740,37 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { +func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { d1 := r.decbufAt(int(ref)) d2 := d1.decbuf(int(d1.uvarint())) + *lbls = (*lbls)[:0] + *chks = (*chks)[:0] + k := int(d2.uvarint()) - lbls := make(labels.Labels, 0, k) for i := 0; i < k; i++ { lno := uint32(d2.uvarint()) lvo := uint32(d2.uvarint()) if d2.err() != nil { - return nil, nil, errors.Wrap(d2.err(), "read series label offsets") + return errors.Wrap(d2.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) if err != nil { - return nil, nil, errors.Wrap(err, "lookup label name") + return errors.Wrap(err, "lookup label name") } lv, err := r.lookupSymbol(lvo) if err != nil { - return nil, nil, errors.Wrap(err, "lookup label value") + return errors.Wrap(err, "lookup label value") } - lbls = append(lbls, labels.Label{Name: ln, Value: lv}) + *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) } // Read the chunks meta data. k = int(d2.uvarint()) - chunks := make([]*ChunkMeta, 0, k) for i := 0; i < k; i++ { mint := d2.varint64() @@ -749,10 +778,10 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { off := d2.uvarint64() if d2.err() != nil { - return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i) + return errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - chunks = append(chunks, &ChunkMeta{ + *chks = append(*chks, &ChunkMeta{ Ref: off, MinTime: mint, MaxTime: maxt, @@ -761,7 +790,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { // TODO(fabxc): verify CRC32. - return lbls, chunks, nil + return nil } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -787,6 +816,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return newBigEndianPostings(d2.get()), nil } +func (r *indexReader) SortedPostings(p Postings) Postings { + return p +} + type stringTuples struct { l int // tuple length s []string // flattened tuple entries diff --git a/index_test.go b/index_test.go index b38350dcf..3616daf30 100644 --- a/index_test.go +++ b/index_test.go @@ -35,21 +35,31 @@ type series struct { type mockIndex struct { series map[uint32]series labelIndex map[string][]string - postings map[labels.Label]Postings + postings *memPostings + symbols map[string]struct{} } func newMockIndex() mockIndex { return mockIndex{ series: make(map[uint32]series), labelIndex: make(map[string][]string), - postings: make(map[labels.Label]Postings), + postings: &memPostings{m: make(map[term][]uint32)}, + symbols: make(map[string]struct{}), } } +func (m mockIndex) Symbols() (map[string]struct{}, error) { + return m.symbols, nil +} + func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error { if _, ok := m.series[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } + for _, lbl := range l { + m.symbols[lbl.Name] = struct{}{} + m.symbols[lbl.Value] = struct{}{} + } s := series{l: l} // Actual chunk data is not stored in the index. @@ -75,41 +85,16 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error { } func (m mockIndex) WritePostings(name, value string, it Postings) error { - lbl := labels.Label{ - Name: name, - Value: value, + if _, ok := m.postings.m[term{name, value}]; ok { + return errors.Errorf("postings for %s=%q already added", name, value) } - - type refdSeries struct { - ref uint32 - series series - } - - // Re-Order so that the list is ordered by labels of the series. - // Internally that is how the series are laid out. - refs := make([]refdSeries, 0) - for it.Next() { - s, ok := m.series[it.At()] - if !ok { - return errors.Errorf("series for reference %d not found", it.At()) - } - refs = append(refs, refdSeries{it.At(), s}) - } - if err := it.Err(); err != nil { + ep, err := expandPostings(it) + if err != nil { return err } + m.postings.m[term{name, value}] = ep - sort.Slice(refs, func(i, j int) bool { - return labels.Compare(refs[i].series.l, refs[j].series.l) < 0 - }) - - postings := make([]uint32, 0, len(refs)) - for _, r := range refs { - postings = append(postings, r.ref) - } - - m.postings[lbl] = newListPostings(postings) - return nil + return it.Err() } func (m mockIndex) Close() error { @@ -126,26 +111,30 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) { } func (m mockIndex) Postings(name, value string) (Postings, error) { - lbl := labels.Label{ - Name: name, - Value: value, - } - - p, ok := m.postings[lbl] - if !ok { - return nil, ErrNotFound - } - - return p, nil + return m.postings.get(term{name, value}), nil } -func (m mockIndex) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { - s, ok := m.series[ref] - if !ok { - return nil, nil, ErrNotFound +func (m mockIndex) SortedPostings(p Postings) Postings { + ep, err := expandPostings(p) + if err != nil { + return errPostings{err: errors.Wrap(err, "expand postings")} } - return s.l, s.chunks, nil + sort.Slice(ep, func(i, j int) bool { + return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0 + }) + return newListPostings(ep) +} + +func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error { + s, ok := m.series[ref] + if !ok { + return ErrNotFound + } + *lset = append((*lset)[:0], s.l...) + *chks = append((*chks)[:0], s.chunks...) + + return nil } func (m mockIndex) LabelIndices() ([][]string, error) { @@ -197,11 +186,21 @@ func TestIndexRW_Postings(t *testing.T) { labels.FromStrings("a", "1", "b", "4"), } + err = iw.AddSymbols(map[string]struct{}{ + "a": struct{}{}, + "b": struct{}{}, + "1": struct{}{}, + "2": struct{}{}, + "3": struct{}{}, + "4": struct{}{}, + }) + require.NoError(t, err) + // Postings lists are only written if a series with the respective // reference was added before. require.NoError(t, iw.AddSeries(1, series[0])) - require.NoError(t, iw.AddSeries(3, series[2])) require.NoError(t, iw.AddSeries(2, series[1])) + require.NoError(t, iw.AddSeries(3, series[2])) require.NoError(t, iw.AddSeries(4, series[3])) err = iw.WritePostings("a", "1", newListPostings([]uint32{1, 2, 3, 4})) @@ -215,8 +214,11 @@ func TestIndexRW_Postings(t *testing.T) { p, err := ir.Postings("a", "1") require.NoError(t, err) + var l labels.Labels + var c []*ChunkMeta + for i := 0; p.Next(); i++ { - l, c, err := ir.Series(p.At()) + err := ir.Series(p.At(), &l, &c) require.NoError(t, err) require.Equal(t, 0, len(c)) @@ -235,6 +237,17 @@ func TestPersistence_index_e2e(t *testing.T) { lbls, err := readPrometheusLabels("testdata/20k.series", 20000) require.NoError(t, err) + // Sort labels as the index writer expects series in sorted order. + sort.Sort(labels.Slice(lbls)) + + symbols := map[string]struct{}{} + for _, lset := range lbls { + for _, l := range lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + var input indexWriterSeriesSlice // Generate ChunkMetas for every label set. @@ -258,6 +271,8 @@ func TestPersistence_index_e2e(t *testing.T) { iw, err := newIndexWriter(dir) require.NoError(t, err) + require.NoError(t, iw.AddSymbols(symbols)) + // Population procedure as done by compaction. var ( postings = &memPostings{m: make(map[term][]uint32, 512)} @@ -311,21 +326,24 @@ func TestPersistence_index_e2e(t *testing.T) { ir, err := newIndexReader(dir) require.NoError(t, err) - for p := range mi.postings { - gotp, err := ir.Postings(p.Name, p.Value) + for p := range mi.postings.m { + gotp, err := ir.Postings(p.name, p.value) require.NoError(t, err) - expp, err := mi.Postings(p.Name, p.Value) + expp, err := mi.Postings(p.name, p.value) + + var lset, explset labels.Labels + var chks, expchks []*ChunkMeta for gotp.Next() { require.True(t, expp.Next()) ref := gotp.At() - lset, chks, err := ir.Series(ref) + err := ir.Series(ref, &lset, &chks) require.NoError(t, err) - explset, expchks, err := mi.Series(expp.At()) + err = mi.Series(expp.At(), &explset, &expchks) require.Equal(t, explset, lset) require.Equal(t, expchks, chks) } diff --git a/querier.go b/querier.go index 9533683d3..a54acdd5a 100644 --- a/querier.go +++ b/querier.go @@ -15,7 +15,7 @@ package tsdb import ( "fmt" - "sort" + "sort" "strings" "github.com/prometheus/tsdb/chunks" @@ -134,8 +134,6 @@ type blockQuerier struct { chunks ChunkReader tombstones TombstoneReader - postingsMapper func(Postings) Postings - mint, maxt int64 } @@ -144,10 +142,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { p, absent := pr.Select(ms...) - if q.postingsMapper != nil { - p = q.postingsMapper(p) - } - return &blockSeriesSet{ set: &populatedChunkSeries{ set: &baseChunkSeries{ @@ -218,7 +212,7 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { p := Intersect(its...) - return p, absent + return r.index.SortedPostings(p), absent } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -434,11 +428,14 @@ func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { + var ( + lset labels.Labels + chunks []*ChunkMeta + ) Outer: for s.p.Next() { ref := s.p.At() - lset, chunks, err := s.index.Series(ref) - if err != nil { + if err := s.index.Series(ref, &lset, &chunks); err != nil { s.err = err return false } diff --git a/querier_test.go b/querier_test.go index 53d904087..d8e8f7657 100644 --- a/querier_test.go +++ b/querier_test.go @@ -432,18 +432,18 @@ func TestBlockQuerier(t *testing.T) { maxt: 6, ms: []labels.Matcher{labels.NewPrefixMatcher("p", "abc")}, exp: newListSeriesSet([]Series{ - newSeries(map[string]string{ - "p": "abcd", - "x": "xyz", - }, - []sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}}, - ), newSeries(map[string]string{ "a": "ab", "p": "abce", }, []sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}}, ), + newSeries(map[string]string{ + "p": "abcd", + "x": "xyz", + }, + []sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}}, + ), }), }, },