From 35b62f001e835fe82da9381532dc3d007e3a82b9 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 26 Apr 2017 18:01:13 +0200 Subject: [PATCH] Change offset table layout, add TOC, ... --- Documentation/format/chunks.md | 14 +- Documentation/format/index.md | 244 ++++++++++-------- db.go | 2 - index.go | 437 +++++++++++++++++++++------------ 4 files changed, 434 insertions(+), 263 deletions(-) diff --git a/Documentation/format/chunks.md b/Documentation/format/chunks.md index d5b71f0b6..ce0829f0f 100644 --- a/Documentation/format/chunks.md +++ b/Documentation/format/chunks.md @@ -3,11 +3,11 @@ The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. ``` - ┌─────────────────────────────┬─────────────────────┐ - │ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte> │ - ├─────────────────────────────┴─────────────────────┤ - │ ┌──────────────┬───────────────────┬────────┐ │ - │ │ len │ encoding <1 byte> │ data │ ... │ - │ └──────────────┴───────────────────┴────────┘ │ - └───────────────────────────────────────────────────┘ +┌─────────────────────────────┬─────────────────────┐ +│ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte> │ +├─────────────────────────────┴─────────────────────┤ +│ ┌──────────────┬───────────────────┬────────┐ │ +│ │ len │ encoding <1 byte> │ data │ ... │ +│ └──────────────┴───────────────────┴────────┘ │ +└───────────────────────────────────────────────────┘ ``` diff --git a/Documentation/format/index.md b/Documentation/format/index.md index c5631c725..f94b518b5 100644 --- a/Documentation/format/index.md +++ b/Documentation/format/index.md @@ -1,27 +1,35 @@ -# Chunks Disk Format +# Index Disk Format The following describes the format of the `index` file found in each block directory. ``` - ┌────────────────────────────┬─────────────────────┐ - │ magic(0xBAAAD700) <4 byte> │ version(1) <1 byte> │ - ├────────────────────────────┴─────────────────────┤ - │ ┌──────────────────────────────────────────────┐ │ - │ │ Symbol Table │ │ - │ ├──────────────────────────────────────────────┤ │ - │ │ Series │ │ - │ ├──────────────────────────────────────────────┤ │ - │ │ Label Index │ │ - │ ├──────────────────────────────────────────────┤ │ - │ │ Postings │ │ - │ ├──────────────────────────────────────────────┤ │ - │ │ Body ... │ │ - │ ├──────────────────────────────────────────────┤ │ - │ │ Body ... │ │ - │ ├──────────────────────────────────────────────┤ │ - │ │ Body ... │ │ - │ └──────────────────────────────────────────────┘ │ - └──────────────────────────────────────────────────┘ +┌────────────────────────────┬─────────────────────┐ +│ magic(0xBAAAD700) <4 byte> │ version(1) <1 byte> │ +├────────────────────────────┴─────────────────────┤ +│ ┌──────────────────────────────────────────────┐ │ +│ │ Symbol Table │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Series │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Label Index 1 │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Label Index N │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Label Index Table │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Postings 1 │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Postings N │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ Postings Table │ │ +│ ├──────────────────────────────────────────────┤ │ +│ │ TOC │ │ +│ └──────────────────────────────────────────────┘ │ +└──────────────────────────────────────────────────┘ ``` @@ -33,19 +41,19 @@ The section contains a sequence of the raw string data, each prefixed with the s Strings are referenced by pointing to the beginning of their length field. The strings are sorted in lexicographically ascending order. ``` - ┌─────────────────────────┬───────────────┐ - │ count(symbols) <4 byte> │ len <4 byte> │ - ├─────────────────────────┴───────────────┤ - │ ┌─────────────────────┬───────────────┐ │ - │ │ len(str_1) │ str_1 │ │ - │ ├─────────────────────┴───────────────┤ │ - │ │ . . . │ │ - │ ├─────────────────────┬───────────────┤ │ - │ │ len(str_n) │ str_1 │ │ - │ └─────────────────────┴───────────────┘ │ - ├─────────────────────────────────────────┤ - │ CRC32 <4 byte> │ - └─────────────────────────────────────────┘ +┌─────────────────────────┬───────────────┐ +│ count(symbols) <4 byte> │ len <4 byte> │ +├─────────────────────────┴───────────────┤ +│ ┌─────────────────────┬───────────────┐ │ +│ │ len(str_1) │ str_1 │ │ +│ ├─────────────────────┴───────────────┤ │ +│ │ . . . │ │ +│ ├─────────────────────┬───────────────┤ │ +│ │ len(str_n) │ str_1 │ │ +│ └─────────────────────┴───────────────┘ │ +├─────────────────────────────────────────┤ +│ CRC32 <4 byte> │ +└─────────────────────────────────────────┘ ``` @@ -55,45 +63,45 @@ The section contains a sequence of series that hold the label set of the series The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. ``` - ┌───────────────────────────────────────┐ - │ count(series) <4 byte> │ - ├───────────────────────────────────────┤ - │ ┌───────────────────────────────────┐ │ - │ │ series_1 │ │ - │ ├───────────────────────────────────┤ │ - │ │ . . . │ │ - │ ├───────────────────────────────────┤ │ - │ │ series_n │ │ - │ └───────────────────────────────────┘ │ - └───────────────────────────────────────┘ +┌───────────────────────────────────────┐ +│ count(series) <4 byte> │ +├───────────────────────────────────────┤ +│ ┌───────────────────────────────────┐ │ +│ │ series_1 │ │ +│ ├───────────────────────────────────┤ │ +│ │ . . . │ │ +│ ├───────────────────────────────────┤ │ +│ │ series_n │ │ +│ └───────────────────────────────────┘ │ +└───────────────────────────────────────┘ ``` Every series holds a list of label pairs and chunks. The label pairs reference the symbol table and the chunks an address in one of the block's chunk files. ``` - ┌─────────────────────────────────────────────────────────┐ - │ len │ - ├─────────────────────────────────────────────────────────┤ - │ ┌──────────────────┬──────────────────────────────────┐ │ - │ │ │ ┌──────────────────────────┐ │ │ - │ │ │ │ ref(l_i.name) │ │ │ - │ │ #labels │ ├──────────────────────────┤ ... │ │ - │ │ │ │ ref(l_i.value) │ │ │ - │ │ │ └──────────────────────────┘ │ │ - │ ├──────────────────┼──────────────────────────────────┤ │ - │ │ │ ┌──────────────────────────┐ │ │ - │ │ │ │ c_i.mint │ │ │ - │ │ │ ├──────────────────────────┤ │ │ - │ │ │ │ c_i.maxt │ │ │ - │ │ #chunks │ ├──────────────────────────┤ ... │ │ - │ │ │ │ ref(c_i.data) │ │ │ - │ │ │ ├──────────────────────────┤ │ │ - │ │ │ │ crc32(c_i.data) │ │ │ - │ │ │ └──────────────────────────┘ │ │ - │ └──────────────────┴──────────────────────────────────┘ │ - ├─────────────────────────────────────────────────────────┤ - │ CRC32 <4 byte> │ - └─────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────┐ +│ len │ +├─────────────────────────────────────────────────────────┤ +│ ┌──────────────────┬──────────────────────────────────┐ │ +│ │ │ ┌──────────────────────────┐ │ │ +│ │ │ │ ref(l_i.name) │ │ │ +│ │ #labels │ ├──────────────────────────┤ ... │ │ +│ │ │ │ ref(l_i.value) │ │ │ +│ │ │ └──────────────────────────┘ │ │ +│ ├──────────────────┼──────────────────────────────────┤ │ +│ │ │ ┌──────────────────────────┐ │ │ +│ │ │ │ c_i.mint │ │ │ +│ │ │ ├──────────────────────────┤ │ │ +│ │ │ │ c_i.maxt │ │ │ +│ │ #chunks │ ├──────────────────────────┤ ... │ │ +│ │ │ │ ref(c_i.data) │ │ │ +│ │ │ ├──────────────────────────┤ │ │ +│ │ │ │ crc32(c_i.data) │ │ │ +│ │ │ └──────────────────────────┘ │ │ +│ └──────────────────┴──────────────────────────────────┘ │ +├─────────────────────────────────────────────────────────┤ +│ CRC32 <4 byte> │ +└─────────────────────────────────────────────────────────┘ ``` The CRC checksum is calculated over the series contents of the index concatenated with the data of its chunks (with encoding byte, without length). @@ -104,43 +112,87 @@ The CRC checksum is calculated over the series contents of the index concatenate The label index indexes holds lists of possible values for label names. A sequence of label index blocks follow on the series entries. ``` - ┌─────────────────────────────────────────────────────────┐ - │ len │ - ├─────────────────────────────────────────────────────────┤ - │ ┌──────────────────┬──────────────────────────────────┐ │ - │ │ │ ┌──────────────────────────┐ │ │ - │ │ │ │ ref(value[0]) │ │ │ - │ │ │ ├──────────────────────────┤ │ │ - │ │ n = len(names) │ │ ... │ ... │ │ - │ │ │ ├──────────────────────────┤ │ │ - │ │ │ │ ref(value[n]) │ │ │ - │ │ │ └──────────────────────────┘ │ │ - │ └──────────────────┴──────────────────────────────────┘ │ - ├─────────────────────────────────────────────────────────┤ - │ CRC32 <4 byte> │ - └─────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────┐ +│ len │ +├─────────────────────────────────────────────────────────┤ +│ ┌──────────────────┬──────────────────────────────────┐ │ +│ │ │ ┌──────────────────────────┐ │ │ +│ │ │ │ ref(value[0]) <4 byte> │ │ │ +│ │ │ ├──────────────────────────┤ │ │ +│ │ n = len(names) │ │ ... │ ... │ │ +│ │ │ ├──────────────────────────┤ │ │ +│ │ │ │ ref(value[n]) <4 byte> │ │ │ +│ │ │ └──────────────────────────┘ │ │ +│ └──────────────────┴──────────────────────────────────┘ │ +├─────────────────────────────────────────────────────────┤ +│ CRC32 <4 byte> │ +└─────────────────────────────────────────────────────────┘ ``` +The sequence of label index blocks is finalized by a lookup table pointing to the beginning of each label index block. It is simply a list of entries that are read into an in-memory hashmap when the index is loaded. ### Postings Postings are postings lists that map label pairs to series they occur in. ``` - ┌─────────────────────────────────────────────────┐ - │ len │ - ├─────────────────────────────────────────────────┤ - │ ┌─────────────────────────────────────────────┐ │ - │ │ ref(series[0]) <4 byte> │ │ - │ ├─────────────────────────────────────────────┤ │ - │ │ ... │ │ - │ ├─────────────────────────────────────────────┤ │ - │ │ ref(series[n]) <4 byte> │ │ - │ └─────────────────────────────────────────────┘ │ - ├─────────────────────────────────────────────────┤ - │ CRC32 <4 byte> │ - └─────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────┐ +│ len │ +├─────────────────────────────────────────────────┤ +│ ┌─────────────────────────────────────────────┐ │ +│ │ ref(series[0]) <4 byte> │ │ +│ ├─────────────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├─────────────────────────────────────────────┤ │ +│ │ ref(series[n]) <4 byte> │ │ +│ └─────────────────────────────────────────────┘ │ +├─────────────────────────────────────────────────┤ +│ CRC32 <4 byte> │ +└─────────────────────────────────────────────────┘ +``` + +### Offset Table + + +``` +┌─────────────────────────┬───────────────┐ +│ count(symbols) <4 byte> │ len <4 byte> │ +├─────────────────────────┴───────────────┤ +│ ┌─────────────────────────────────────┐ │ +│ │ n = len(strs) │ │ +│ ├─────────────────────────────────────┤ │ +│ │ len(strs[0]) │ │ +│ ├─────────────────────────────────────┤ │ +│ │ ... │ │ +│ ├─────────────────────────────────────┤ │ +│ │ strs[n] │ │ +│ ├─────────────────────────────────────┤ │ +│ │ offset │ │ +│ └─────────────────────────────────────┘ │ +│ . . . │ +├─────────────────────────────────────────┤ +│ CRC32 <4 byte> │ +└─────────────────────────────────────────┘ ``` -### \ No newline at end of file + +### TOC + +The table of contents serves as an entry point to the entire index. It's size is fixed. + +``` +┌─────────────────────────────────────────────┐ +│ ref(symbols) <8 byte> │ +├─────────────────────────────────────────────┤ +│ ref(series) <8 byte> │ +├─────────────────────────────────────────────┤ +│ ref(label indices) <8 byte> │ +├─────────────────────────────────────────────┤ +│ ref(label indices table) <8 byte> │ +├─────────────────────────────────────────────┤ +│ ref(postings) <8 byte> │ +├─────────────────────────────────────────────┤ +│ ref(postings table) <8 byte> │ +└─────────────────────────────────────────────┘ +``` \ No newline at end of file diff --git a/db.go b/db.go index 66bd6679a..13ab5b7eb 100644 --- a/db.go +++ b/db.go @@ -95,8 +95,6 @@ type Appender interface { Rollback() error } -const sep = '\xff' - // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { diff --git a/index.go b/index.go index 53b437dbd..8a2dbdb85 100644 --- a/index.go +++ b/index.go @@ -24,6 +24,9 @@ import ( "path/filepath" "sort" "strings" + "unsafe" + + "math" "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" @@ -35,8 +38,6 @@ const ( MagicIndex = 0xBAAAD700 indexFormatV1 = 1 - - indexSeriesFormatV1 = 1 ) const compactionPageBytes = minSectorSize * 64 @@ -104,15 +105,16 @@ type IndexWriter interface { // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - f *os.File - fbuf *bufio.Writer - pos int + f *os.File + fbuf *bufio.Writer + pos uint64 + + toc indexTOC stage indexWriterStage // Reusable memory. buf1 encbuf buf2 encbuf - b []byte uint32s []uint32 series map[uint32]*indexWriterSeries @@ -123,6 +125,15 @@ type indexWriter struct { crc32 hash.Hash } +type indexTOC struct { + symbols uint64 + series uint64 + labelIndices uint64 + labelIndicesTable uint64 + postings uint64 + postingsTable uint64 +} + func newIndexWriter(dir string) (*indexWriter, error) { df, err := fileutil.OpenDir(dir) if err != nil { @@ -145,7 +156,6 @@ func newIndexWriter(dir string) (*indexWriter, error) { // Reusable memory. buf1: encbuf{b: make([]byte, 0, 1<<22)}, buf2: encbuf{b: make([]byte, 0, 1<<22)}, - b: make([]byte, 0, 1<<23), uint32s: make([]uint32, 0, 1<<15), // Caches. @@ -162,21 +172,75 @@ func newIndexWriter(dir string) (*indexWriter, error) { func (w *indexWriter) write(bufs ...[]byte) error { for _, b := range bufs { n, err := w.fbuf.Write(b) - w.pos += n + w.pos += uint64(n) if err != nil { return err } + // For now the index file must not grow beyond 4GiB. Some of the fixed-sized + // offset references in v1 are only 4 byte large. + // Once we move to compressed/varint representations in those areas, this limitation + // can be lifted. + if w.pos > math.MaxUint32 { + return errors.Errorf("exceeding max size of 4GiB") + } } return nil } +// ensureStage handles transitions between write stages and ensures that IndexWriter +// methods are called in an order valid for the implementation. +func (w *indexWriter) ensureStage(s indexWriterStage) error { + if w.stage == s { + return nil + } + if w.stage > s { + return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) + } + + // Complete population stage by writing symbols and series. + if w.stage == idxStagePopulate { + w.toc.symbols = w.pos + if err := w.writeSymbols(); err != nil { + return err + } + w.toc.series = w.pos + if err := w.writeSeries(); err != nil { + return err + } + } + + // Mark start of sections in table of contents. + switch s { + case idxStageLabelIndex: + w.toc.labelIndices = w.pos + + case idxStagePostings: + w.toc.labelIndicesTable = w.pos + if err := w.writeOffsetTable(w.labelIndexes); err != nil { + return err + } + w.toc.postings = w.pos + + case idxStageDone: + w.toc.postingsTable = w.pos + if err := w.writeOffsetTable(w.postings); err != nil { + return err + } + if err := w.writeTOC(); err != nil { + return err + } + } + + w.stage = s + return nil +} + func (w *indexWriter) writeMeta() error { - b := [5]byte{} + w.buf1.reset() + w.buf1.putBE32(MagicIndex) + w.buf1.putByte(indexFormatV1) - binary.BigEndian.PutUint32(b[:4], MagicIndex) - b[4] = flagStd - - return w.write(b[:]) + return w.write(w.buf1.get()) } func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { @@ -210,10 +274,12 @@ func (w *indexWriter) writeSymbols() error { w.buf2.reset() for _, s := range symbols { - w.symbols[s] = uint32(w.pos + headerSize + w.buf2.len()) + w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - w.buf2.putUvarint(len(s)) - w.buf2.putString(s) + // NOTE: len(s) gives the number of runes, not the number of bytes. + // Therefore the read-back length for strings with unicode characters will + // be off when not using putCstr. + w.buf2.putUvarintStr(s) } w.buf1.putBE32int(len(symbols)) @@ -277,28 +343,6 @@ func (w *indexWriter) writeSeries() error { return nil } -// ensureStage handles transitions between write stages and ensures that IndexWriter -// methods are called in an order valid for the implementation. -func (w *indexWriter) ensureStage(s indexWriterStage) error { - if w.stage == s { - return nil - } - if w.stage > s { - return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) - } - if w.stage == idxStagePopulate { - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err - } - w.stage = s - return nil - } - return nil -} - func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { if err := w.ensureStage(idxStageLabelIndex); err != nil { return errors.Wrap(err, "ensure stage") @@ -311,8 +355,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { sort.Sort(valt) w.labelIndexes = append(w.labelIndexes, hashEntry{ - name: strings.Join(names, string(sep)), - offset: uint32(w.pos), + keys: names, + offset: w.pos, }) w.buf2.reset() @@ -331,16 +375,52 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { return errors.Wrap(err, "write label index") } +// writeOffsetTable writes a sequence of readable hash entries. +func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { + w.buf1.reset() + w.buf1.putBE32int(len(entries)) + + w.buf2.reset() + + for _, e := range entries { + w.buf2.putUvarint(len(e.keys)) + for _, k := range e.keys { + w.buf2.putUvarintStr(k) + } + w.buf2.putUvarint64(e.offset) + } + + w.buf1.putBE32int(w.buf2.len()) + w.buf2.putHash(w.crc32) + + return w.write(w.buf1.get(), w.buf2.get()) +} + +const indexTOCLen = 6*8 + 4 + +func (w *indexWriter) writeTOC() error { + w.buf1.reset() + + w.buf1.putBE64(w.toc.symbols) + w.buf1.putBE64(w.toc.series) + w.buf1.putBE64(w.toc.labelIndices) + w.buf1.putBE64(w.toc.labelIndicesTable) + w.buf1.putBE64(w.toc.postings) + w.buf1.putBE64(w.toc.postingsTable) + + w.buf1.putHash(w.crc32) + + return w.write(w.buf1.get()) +} + func (w *indexWriter) WritePostings(name, value string, it Postings) error { if err := w.ensureStage(idxStagePostings); err != nil { return errors.Wrap(err, "ensure stage") } - key := name + string(sep) + value - w.postings = append(w.postings, hashEntry{ - name: key, - offset: uint32(w.pos), + keys: []string{name, value}, + offset: w.pos, }) // Order of the references in the postings list does not imply order @@ -382,62 +462,12 @@ func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } type hashEntry struct { - name string - offset uint32 -} - -func (w *indexWriter) writeHashmap(h []hashEntry) error { - w.b = append(w.b[:0], flagStd, 0, 0, 0, 0) - buf := [binary.MaxVarintLen32]byte{} - - for _, e := range h { - n := binary.PutUvarint(buf[:], uint64(len(e.name))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, e.name...) - - n = binary.PutUvarint(buf[:], uint64(e.offset)) - w.b = append(w.b, buf[:n]...) - } - - binary.BigEndian.PutUint32(w.b[1:], uint32(len(w.b)-5)) - - w.crc32.Reset() - if _, err := w.crc32.Write(w.b[5:]); err != nil { - return errors.Wrap(err, "calculate label index CRC32 checksum") - } - w.b = w.crc32.Sum(w.b) - - return w.write(w.b) -} - -func (w *indexWriter) finalize() error { - if err := w.ensureStage(idxStageDone); err != nil { - return errors.Wrap(err, "ensure stage") - } - // Write out hash maps to jump to correct label index and postings sections. - lo := uint32(w.pos) - if err := w.writeHashmap(w.labelIndexes); err != nil { - return err - } - - po := uint32(w.pos) - if err := w.writeHashmap(w.postings); err != nil { - return err - } - - // Terminate index file with offsets to hashmaps. This is the entry Pointer - // for any index query. - // TODO(fabxc): also store offset to series section to allow plain - // iteration over all existing series? - b := [8]byte{} - binary.BigEndian.PutUint32(b[:4], lo) - binary.BigEndian.PutUint32(b[4:], po) - - return w.write(b[:]) + keys []string + offset uint64 } func (w *indexWriter) Close() error { - if err := w.finalize(); err != nil { + if err := w.ensureStage(idxStageDone); err != nil { return err } if err := w.fbuf.Flush(); err != nil { @@ -478,7 +508,8 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b []byte + toc indexTOC // Close that releases the underlying resources of the byte slice. c io.Closer @@ -509,57 +540,81 @@ func newIndexReader(dir string) (*indexReader, error) { return nil, errors.Errorf("invalid magic number %x", m) } - // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) - poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) + if err := r.readTOC(); err != nil { + return nil, errors.Wrap(err, "read TOC") + } - flag, b, err := r.section(loff) + r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { - return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) + return nil, errors.Wrap(err, "read label index table") } - if r.labels, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read label index hashmap") - } - flag, b, err = r.section(poff) + r.postings, err = r.readOffsetTable(r.toc.postingsTable) if err != nil { - return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) - } - if r.postings, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read postings hashmap") + return nil, errors.Wrap(err, "read postings table") } return r, nil } -func readHashmap(flag byte, b []byte) (map[string]uint32, error) { - if flag != flagStd { - return nil, errInvalidFlag +func (r *indexReader) readTOC() error { + if len(r.b) < indexTOCLen { + return errInvalidSize } - h := make(map[string]uint32, 512) + b := r.b[len(r.b)-indexTOCLen:] - for len(b) > 0 { - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read key length") - } - b = b[n:] + r.toc.symbols = binary.BigEndian.Uint64(b[0:8]) + r.toc.series = binary.BigEndian.Uint64(b[8:16]) + r.toc.labelIndices = binary.BigEndian.Uint64(b[16:24]) + r.toc.labelIndicesTable = binary.BigEndian.Uint64(b[24:32]) + r.toc.postings = binary.BigEndian.Uint64(b[32:40]) + r.toc.postingsTable = binary.BigEndian.Uint64(b[40:48]) - if len(b) < int(l) { - return nil, errors.Wrap(errInvalidSize, "read key") - } - s := string(b[:l]) - b = b[l:] + // TODO(fabxc): validate checksum. - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read offset value") - } - b = b[n:] + return nil +} - h[s] = uint32(o) +func (r *indexReader) decbufAt(off int) decbuf { + if len(r.b) < off { + return decbuf{e: errInvalidSize} + } + return decbuf{b: r.b[off:]} +} + +// readOffsetTable reads an offset table at the given position and returns a map +// with the key strings concatenated by the 0xff unicode non-character. +func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { + // A table might not have been written at all, in which case the position + // is zeroed out. + if off == 0 { + return nil, nil } - return h, nil + const sep = "\xff" + + var ( + d1 = r.decbufAt(int(off)) + cnt = d1.readBE32() + el = d1.readBE32() + d2 = d1.get(int(el)) + ) + + res := make(map[string]uint32, 512) + + for d2.err() == nil && d2.len() > 0 && cnt > 0 { + keyCount := int(d2.readUvarint()) + keys := make([]string, 0, keyCount) + + for i := 0; i < keyCount; i++ { + keys = append(keys, d2.readUvarintStr()) + } + res[strings.Join(keys, sep)] = uint32(d2.readUvarint()) + + cnt-- + } + + // TODO(fabxc): verify checksum from remainer of d1. + return res, d2.err() } func (r *indexReader) Close() error { @@ -619,7 +674,9 @@ func (r *indexReader) getSized(off uint32) ([]byte, error) { } func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { - key := strings.Join(names, string(sep)) + const sep = "\xff" + + key := strings.Join(names, sep) off, ok := r.labels[key] if !ok { // XXX(fabxc): hot fix. Should return a partial data error and handle cases @@ -652,6 +709,8 @@ func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } func (r *indexReader) LabelIndices() ([][]string, error) { + const sep = "\xff" + res := [][]string{} for s := range r.labels { @@ -744,6 +803,8 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { } func (r *indexReader) Postings(name, value string) (Postings, error) { + const sep = "\xff" + key := name + string(sep) + value off, ok := r.postings[key] @@ -832,30 +893,33 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { return res, nil } +// enbuf is a helper type to populate a byte slice with various types. type encbuf struct { b []byte c [binary.MaxVarintLen64]byte } -func (e *encbuf) reset() { - e.b = e.b[:0] -} +func (e *encbuf) reset() { e.b = e.b[:0] } +func (e *encbuf) get() []byte { return e.b } +func (e *encbuf) len() int { return len(e.b) } + +func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } +func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } + +func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } +func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } func (e *encbuf) putBE32(x uint32) { binary.BigEndian.PutUint32(e.c[:], x) e.b = append(e.b, e.c[:4]...) } -func (e *encbuf) putBE32int(x int) { - e.putBE32(uint32(x)) -} - -func (e *encbuf) putUvarint32(x uint32) { - e.putUvarint64(uint64(x)) -} - -func (e *encbuf) putUvarint(x int) { - e.putUvarint64(uint64(x)) +func (e *encbuf) putBE64(x uint64) { + binary.BigEndian.PutUint64(e.c[:], x) + e.b = append(e.b, e.c[:8]...) } func (e *encbuf) putUvarint64(x uint64) { @@ -868,27 +932,84 @@ func (e *encbuf) putVarint64(x int64) { e.b = append(e.b, e.c[:n]...) } -func (e *encbuf) putString(s string) { - e.b = append(e.b, s...) -} - -func (e *encbuf) putBytes(b []byte) { - e.b = append(e.b, b...) +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *encbuf) putUvarintStr(s string) { + b := *(*[]byte)(unsafe.Pointer(&s)) + e.putUvarint(len(b)) + e.putString(s) } +// putHash appends a hash over the buffers current contents to the buffer. func (e *encbuf) putHash(h hash.Hash) { h.Reset() _, err := h.Write(e.b) if err != nil { panic(err) // The CRC32 implementation does not error } - h.Sum(e.b) + e.b = h.Sum(e.b) } -func (e *encbuf) get() []byte { - return e.b +type decbuf struct { + b []byte + e error } -func (e *encbuf) len() int { - return len(e.b) +func (d *decbuf) readUvarintStr() string { + l := d.readUvarint() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := string(d.b[:l]) + d.b = d.b[l:] + return s +} + +func (d *decbuf) readUvarint() uint64 { + if d.e != nil { + return 0 + } + x, n := binary.Uvarint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) readBE32() uint32 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.b) + d.b = d.b[4:] + return x +} + +func (d *decbuf) get(l int) decbuf { + if d.e != nil { + return decbuf{e: d.e} + } + if l > len(d.b) { + return decbuf{e: errInvalidSize} + } + r := decbuf{b: d.b[:l]} + d.b = d.b[l:] + return r +} + +func (d *decbuf) err() error { + return d.e +} + +func (d *decbuf) len() int { + return len(d.b) }