diff --git a/tsdb/block.go b/tsdb/block.go index 28b314860..c4976b04e 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -39,8 +39,8 @@ import ( // The methods must be called in the order they are specified in. type IndexWriter interface { // AddSymbols registers all string symbols that are encountered in series - // and other indices. - AddSymbols(sym map[string]struct{}) error + // and other indices. Symbols must be added in sorted order. + AddSymbol(sym string) error // AddSeries populates the index writer with a series and its offsets // of chunks that the index can reference. @@ -61,9 +61,10 @@ type IndexWriter interface { // IndexReader provides reading access of serialized index data. type IndexReader interface { - // Symbols returns a set of string symbols that may occur in series' labels - // and indices. - Symbols() (map[string]struct{}, error) + // Symbols return an iterator over sorted string symbols that may occur in + // series' labels and indices. It is not safe to use the returned strings + // beyond the lifetime of the index reader. + Symbols() index.StringIter // LabelValues returns sorted possible label values. LabelValues(names ...string) (index.StringTuples, error) @@ -432,9 +433,8 @@ type blockIndexReader struct { b *Block } -func (r blockIndexReader) Symbols() (map[string]struct{}, error) { - s, err := r.ir.Symbols() - return s, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) +func (r blockIndexReader) Symbols() index.StringIter { + return r.ir.Symbols() } func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, error) { diff --git a/tsdb/compact.go b/tsdb/compact.go index 3e57512df..a3dae8729 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -650,7 +650,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, var ( set ChunkSeriesSet - allSymbols = make(map[string]struct{}, 1<<16) + symbols index.StringIter closers = []io.Closer{} overlapping bool ) @@ -700,14 +700,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } closers = append(closers, tombsr) - symbols, err := indexr.Symbols() - if err != nil { - return errors.Wrap(err, "read symbols") - } - for s := range symbols { - allSymbols[s] = struct{}{} - } - k, v := index.AllPostingsKey() all, err := indexr.Postings(k, v) if err != nil { @@ -716,15 +708,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, all = indexr.SortedPostings(all) s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) + syms := indexr.Symbols() if i == 0 { set = s + symbols = syms continue } set, err = newCompactionMerger(set, s) if err != nil { return err } + symbols = newMergedStringIter(symbols, syms) } var ( @@ -732,8 +727,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, ref = uint64(0) ) - if err := indexw.AddSymbols(allSymbols); err != nil { - return errors.Wrap(err, "add symbols") + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } + } + if symbols.Err() != nil { + return errors.Wrap(symbols.Err(), "next symbol") } delIter := &deletedIterator{} @@ -1026,3 +1026,47 @@ func (c *compactionMerger) Err() error { func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return c.l, c.c, c.intervals } + +func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter { + return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()} +} + +type mergedStringIter struct { + a index.StringIter + b index.StringIter + aok, bok bool + cur string +} + +func (m *mergedStringIter) Next() bool { + if (!m.aok && !m.bok) || (m.Err() != nil) { + return false + } + + if !m.aok { + m.cur = m.b.At() + m.bok = m.b.Next() + } else if !m.bok { + m.cur = m.a.At() + m.aok = m.a.Next() + } else if m.b.At() > m.a.At() { + m.cur = m.a.At() + m.aok = m.a.Next() + } else if m.a.At() > m.b.At() { + m.cur = m.b.At() + m.bok = m.b.Next() + } else { // Equal. + m.cur = m.b.At() + m.aok = m.a.Next() + m.bok = m.b.Next() + } + + return true +} +func (m mergedStringIter) At() string { return m.cur } +func (m mergedStringIter) Err() error { + if m.a.Err() != nil { + return m.a.Err() + } + return m.b.Err() +} diff --git a/tsdb/docs/format/index.md b/tsdb/docs/format/index.md index fd6bf043c..1d87970dd 100644 --- a/tsdb/docs/format/index.md +++ b/tsdb/docs/format/index.md @@ -126,7 +126,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc ### Label Index A label index section indexes the existing (combined) values for one or more label names. -The `#names` field determines the number of indexed label names, followed by the total number of entries in the `#entries` field. The body holds #entries / #names tuples of symbol table references, each tuple being of #names length. The value tuples are sorted in lexicographically increasing order. +The `#names` field determines the number of indexed label names, followed by the total number of entries in the `#entries` field. The body holds #entries / #names tuples of symbol table references, each tuple being of #names length. The value tuples are sorted in lexicographically increasing order. This is no longer used. ``` ┌───────────────┬────────────────┬────────────────┐ @@ -181,7 +181,7 @@ The sequence of postings sections is finalized by a [postings offset table](#pos A label offset table stores a sequence of label offset entries. Every label offset entry holds the label name and the offset to its values in the label index section. -They are used to track label index sections. They are read into memory when an index file is loaded. +They are used to track label index sections. This is no longer used. ``` ┌─────────────────────┬──────────────────────┐ diff --git a/tsdb/head.go b/tsdb/head.go index 550e9a984..b3c07efb4 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1339,16 +1339,17 @@ func (h *headIndexReader) Close() error { return nil } -func (h *headIndexReader) Symbols() (map[string]struct{}, error) { +func (h *headIndexReader) Symbols() index.StringIter { h.head.symMtx.RLock() - defer h.head.symMtx.RUnlock() - - res := make(map[string]struct{}, len(h.head.symbols)) + res := make([]string, 0, len(h.head.symbols)) for s := range h.head.symbols { - res[s] = struct{}{} + res = append(res, s) } - return res, nil + h.head.symMtx.RUnlock() + + sort.Strings(res) + return index.NewStringListIter(res) } // LabelValues returns the possible label values diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 7a4cc3ae9..fb68c71a2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -123,11 +123,14 @@ type Writer struct { buf1 encoding.Encbuf buf2 encoding.Encbuf - symbols map[string]uint32 // symbol offsets - reverseSymbols map[uint32]string - labelIndexes []labelIndexHashEntry // label index offsets - postings []postingsHashEntry // postings lists offsets - labelNames map[string]uint64 // label names, and their usage + numSymbols int + symbols *Symbols + symbolFile *fileutil.MmapFile + lastSymbol string + + labelIndexes []labelIndexHashEntry // label index offsets + postings []postingsHashEntry // postings lists offsets + labelNames map[string]uint64 // label names, and their usage // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -275,7 +278,13 @@ func (w *Writer) ensureStage(s indexWriterStage) error { switch s { case idxStageSymbols: w.toc.Symbols = w.pos + if err := w.startSymbols(); err != nil { + return err + } case idxStageSeries: + if err := w.finishSymbols(); err != nil { + return err + } w.toc.Series = w.pos case idxStageLabelIndex: @@ -339,16 +348,16 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta for _, l := range lset { // here we have an index for the symbol file if v2, otherwise it's an offset - index, ok := w.symbols[l.Name] - if !ok { - return errors.Errorf("symbol entry for %q does not exist", l.Name) + index, err := w.symbols.ReverseLookup(l.Name) + if err != nil { + return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err) } w.labelNames[l.Name]++ w.buf2.PutUvarint32(index) - index, ok = w.symbols[l.Value] - if !ok { - return errors.Errorf("symbol entry for %q does not exist", l.Value) + index, err = w.symbols.ReverseLookup(l.Value) + if err != nil { + return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) } w.buf2.PutUvarint32(index) } @@ -388,58 +397,66 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return nil } -func (w *Writer) AddSymbols(sym map[string]struct{}) error { +func (w *Writer) startSymbols() error { + // We are at w.toc.Symbols. + // Leave 4 bytes of space for the length, and another 4 for the number of symbols + // which will both be calculated later. + return w.write([]byte("alenblen")) +} + +func (w *Writer) AddSymbol(sym string) error { if err := w.ensureStage(idxStageSymbols); err != nil { return err } - // Generate sorted list of strings we will store as reference table. - symbols := make([]string, 0, len(sym)) - - for s := range sym { - symbols = append(symbols, s) + if w.numSymbols != 0 && sym <= w.lastSymbol { + return errors.Errorf("symbol %q out-of-order", sym) } - sort.Strings(symbols) - - startPos := w.pos - // Leave 4 bytes of space for the length, which will be calculated later. - if err := w.write([]byte("alen")); err != nil { - return err - } - w.crc32.Reset() - + w.lastSymbol = sym + w.numSymbols++ w.buf1.Reset() - w.buf1.PutBE32int(len(symbols)) - w.buf1.WriteToHash(w.crc32) - if err := w.write(w.buf1.Get()); err != nil { - return err - } - - w.symbols = make(map[string]uint32, len(symbols)) - w.reverseSymbols = make(map[uint32]string, len(symbols)) - - for index, s := range symbols { - w.symbols[s] = uint32(index) - w.reverseSymbols[uint32(index)] = s - w.buf1.Reset() - w.buf1.PutUvarintStr(s) - w.buf1.WriteToHash(w.crc32) - if err := w.write(w.buf1.Get()); err != nil { - return err - } - } - - // Write out the length. - w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) - if err := w.writeAt(w.buf1.Get(), startPos); err != nil { - return err - } - - w.buf1.Reset() - w.buf1.PutHashSum(w.crc32) + w.buf1.PutUvarintStr(sym) return w.write(w.buf1.Get()) } +func (w *Writer) finishSymbols() error { + // Write out the length and symbol count. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4)) + w.buf1.PutBE32int(int(w.numSymbols)) + if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil { + return err + } + + hashPos := w.pos + // Leave space for the hash. We can only calculate it + // now that the number of symbols is known, so mmap and do it from there. + if err := w.write([]byte("hash")); err != nil { + return err + } + if err := w.fbuf.Flush(); err != nil { + return err + } + + var err error + w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name()) + if err != nil { + return err + } + hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable) + w.buf1.Reset() + w.buf1.PutBE32(hash) + if err := w.writeAt(w.buf1.Get(), hashPos); err != nil { + return err + } + + // Load in the symbol table efficiently for the rest of the index writing. + w.symbols, err = NewSymbols(realByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols)) + if err != nil { + return errors.Wrap(err, "read symbols") + } + return nil +} + func (w *Writer) WriteLabelIndex(names []string, values []string) error { if len(values)%len(names) != 0 { return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) @@ -481,12 +498,12 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // here we have an index for the symbol file if v2, otherwise it's an offset for _, v := range valt.entries { - index, ok := w.symbols[v] - if !ok { - return errors.Errorf("symbol entry for %q does not exist", v) + sid, err := w.symbols.ReverseLookup(v) + if err != nil { + return errors.Errorf("symbol entry for %q does not exist: %v", v, err) } w.buf1.Reset() - w.buf1.PutBE32(index) + w.buf1.PutBE32(sid) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err @@ -585,7 +602,7 @@ func (w *Writer) writePostingsOffsetTable() error { return w.write(w.buf1.Get()) } -const indexTOCLen = 6*8 + 4 +const indexTOCLen = 6*8 + crc32.Size func (w *Writer) writeTOC() error { w.buf1.Reset() @@ -629,8 +646,8 @@ func (w *Writer) writePostings() error { return errors.Errorf("series not 16-byte aligned at %d", startPos) } offsets = append(offsets, uint32(startPos/16)) - // Skip to next series. The 4 is for the CRC32. - d.Skip(d.Uvarint() + 4) + // Skip to next series. + d.Skip(d.Uvarint() + crc32.Size) if err := d.Err(); err != nil { return nil } @@ -654,9 +671,13 @@ func (w *Writer) writePostings() error { names = names[1:] } - nameSymbols := map[uint32]struct{}{} + nameSymbols := map[uint32]string{} for _, name := range batchNames { - nameSymbols[w.symbols[name]] = struct{}{} + sid, err := w.symbols.ReverseLookup(name) + if err != nil { + return err + } + nameSymbols[sid] = name } // Label name -> label value -> positions. postings := map[uint32]map[uint32][]uint32{} @@ -679,14 +700,11 @@ func (w *Writer) writePostings() error { if _, ok := postings[lno]; !ok { postings[lno] = map[uint32][]uint32{} } - if _, ok := postings[lno][lvo]; !ok { - postings[lno][lvo] = []uint32{} - } postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) } } - // Skip to next series. The 4 is for the CRC32. - d.Skip(l - (startLen - d.Len()) + 4) + // Skip to next series. + d.Skip(l - (startLen - d.Len()) + crc32.Size) if err := d.Err(); err != nil { return nil } @@ -694,15 +712,23 @@ func (w *Writer) writePostings() error { for _, name := range batchNames { // Write out postings for this label name. - values := make([]uint32, 0, len(postings[w.symbols[name]])) - for v := range postings[w.symbols[name]] { + sid, err := w.symbols.ReverseLookup(name) + if err != nil { + return err + } + values := make([]uint32, 0, len(postings[sid])) + for v := range postings[sid] { values = append(values, v) } // Symbol numbers are in order, so the strings will also be in order. sort.Sort(uint32slice(values)) for _, v := range values { - if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil { + value, err := w.symbols.Lookup(v) + if err != nil { + return err + } + if err := w.writePosting(name, value, postings[sid][v]); err != nil { return err } } @@ -765,6 +791,11 @@ func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err } + if w.symbolFile != nil { + if err := w.symbolFile.Close(); err != nil { + return err + } + } if err := w.fbuf.Flush(); err != nil { return err } @@ -782,6 +813,18 @@ type StringTuples interface { At(i int) ([]string, error) } +// StringIter iterates over a sorted list of strings. +type StringIter interface { + // Next advances the iterator and returns true if another value was found. + Next() bool + + // At returns the value at the current iterator position. + At() string + + // Err returns the last error of the iterator. + Err() error +} + type Reader struct { b ByteSlice toc *TOC @@ -1038,6 +1081,9 @@ func (s Symbols) Lookup(o uint32) (string, error) { } func (s Symbols) ReverseLookup(sym string) (uint32, error) { + if len(s.offsets) == 0 { + return 0, errors.Errorf("unknown symbol %q - no symbols", sym) + } i := sort.Search(len(s.offsets), func(i int) bool { // Any decoding errors here will be lost, however // we already read through all of this at startup. @@ -1077,24 +1123,43 @@ func (s Symbols) ReverseLookup(sym string) (uint32, error) { return uint32(s.bs.Len() - lastLen), nil } -func (s Symbols) All() (map[string]struct{}, error) { - d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable) - cnt := d.Be32int() - res := make(map[string]struct{}, cnt) - for d.Err() == nil && cnt > 0 { - res[d.UvarintStr()] = struct{}{} - cnt-- - } - if d.Err() != nil { - return nil, d.Err() - } - return res, nil -} - func (s Symbols) Size() int { return len(s.offsets) * 8 } +func (s Symbols) Iter() StringIter { + d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable) + cnt := d.Be32int() + return &symbolsIter{ + d: d, + cnt: cnt, + } +} + +// symbolsIter implements StringIter. +type symbolsIter struct { + d encoding.Decbuf + cnt int + cur string + err error +} + +func (s *symbolsIter) Next() bool { + if s.cnt == 0 || s.err != nil { + return false + } + s.cur = yoloString(s.d.UvarintBytes()) + s.cnt-- + if s.d.Err() != nil { + s.err = s.d.Err() + return false + } + return true +} + +func (s symbolsIter) At() string { return s.cur } +func (s symbolsIter) Err() error { return s.err } + // ReadOffsetTable reads an offset table and at the given position calls f for each // found entry. If f returns an error it stops decoding and returns the received error. func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error { @@ -1137,9 +1202,9 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) { return r.symbols.Lookup(o) } -// Symbols returns a set of symbols that exist within the index. -func (r *Reader) Symbols() (map[string]struct{}, error) { - return r.symbols.All() +// Symbols returns an iterator over the symbols that exist within the index. +func (r *Reader) Symbols() StringIter { + return r.symbols.Iter() } // SymbolTableSize returns the symbol table size in bytes. @@ -1359,6 +1424,28 @@ func (t *stringTuples) Less(i, j int) bool { return false } +// NewStringListIterator returns a StringIter for the given sorted list of strings. +func NewStringListIter(s []string) StringIter { + return &stringListIter{l: s} +} + +// symbolsIter implements StringIter. +type stringListIter struct { + l []string + cur string +} + +func (s *stringListIter) Next() bool { + if len(s.l) == 0 { + return false + } + s.cur = s.l[0] + s.l = s.l[1:] + return true +} +func (s stringListIter) At() string { return s.cur } +func (s stringListIter) Err() error { return nil } + // Decoder provides decoding methods for the v1 and v2 index file format. // // It currently does not contain decoding methods for all entry types but can be extended diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 408fa78c2..07f8f8a47 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -186,15 +186,12 @@ func TestIndexRW_Postings(t *testing.T) { labels.FromStrings("a", "1", "b", "4"), } - err = iw.AddSymbols(map[string]struct{}{ - "a": {}, - "b": {}, - "1": {}, - "2": {}, - "3": {}, - "4": {}, - }) - testutil.Ok(t, err) + testutil.Ok(t, iw.AddSymbol("1")) + testutil.Ok(t, iw.AddSymbol("2")) + testutil.Ok(t, iw.AddSymbol("3")) + testutil.Ok(t, iw.AddSymbol("4")) + testutil.Ok(t, iw.AddSymbol("a")) + testutil.Ok(t, iw.AddSymbol("b")) // Postings lists are only written if a series with the respective // reference was added before. @@ -280,7 +277,14 @@ func TestPostingsMany(t *testing.T) { symbols["i"] = struct{}{} symbols["foo"] = struct{}{} symbols["bar"] = struct{}{} - testutil.Ok(t, iw.AddSymbols(symbols)) + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + testutil.Ok(t, iw.AddSymbol(s)) + } for i, s := range series { testutil.Ok(t, iw.AddSeries(uint64(i), s)) @@ -390,7 +394,14 @@ func TestPersistence_index_e2e(t *testing.T) { iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename)) testutil.Ok(t, err) - testutil.Ok(t, iw.AddSymbols(symbols)) + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + testutil.Ok(t, iw.AddSymbol(s)) + } // Population procedure as done by compaction. var ( @@ -479,14 +490,18 @@ func TestPersistence_index_e2e(t *testing.T) { } } - gotSymbols, err := ir.Symbols() - testutil.Ok(t, err) - - testutil.Equals(t, len(mi.symbols), len(gotSymbols)) - for s := range mi.symbols { - _, ok := gotSymbols[s] - testutil.Assert(t, ok, "") + gotSymbols := []string{} + it := ir.Symbols() + for it.Next() { + gotSymbols = append(gotSymbols, it.At()) } + testutil.Ok(t, it.Err()) + expSymbols := []string{} + for s := range mi.symbols { + expSymbols = append(expSymbols, s) + } + sort.Strings(expSymbols) + testutil.Equals(t, expSymbols, gotSymbols) testutil.Ok(t, ir.Close()) } diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index c7bb422ed..913061e83 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -24,7 +24,7 @@ type mockIndexWriter struct { series []seriesSamples } -func (mockIndexWriter) AddSymbols(sym map[string]struct{}) error { return nil } +func (mockIndexWriter) AddSymbol(sym string) error { return nil } func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error { i := -1 for j, s := range m.series { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index df8a0cb3b..3f69d7bc0 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1316,8 +1316,13 @@ func newMockIndex() mockIndex { return ix } -func (m mockIndex) Symbols() (map[string]struct{}, error) { - return m.symbols, nil +func (m mockIndex) Symbols() index.StringIter { + l := []string{} + for s := range m.symbols { + l = append(l, s) + } + sort.Strings(l) + return index.NewStringListIter(l) } func (m *mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {