Change offset table layout, add TOC, ...

This commit is contained in:
Fabian Reinartz 2017-04-26 18:01:13 +02:00
parent 8b1f514a2d
commit 35b62f001e
4 changed files with 434 additions and 263 deletions

View file

@ -3,11 +3,11 @@
The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block.
``` ```
┌─────────────────────────────┬─────────────────────┐ ┌─────────────────────────────┬─────────────────────┐
│ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte> │ magic(0x85BD40DD) <4 byte> │ version(1) <1 byte>
├─────────────────────────────┴─────────────────────┤ ├─────────────────────────────┴─────────────────────┤
│ ┌──────────────┬───────────────────┬────────┐ │ │ ┌──────────────┬───────────────────┬────────┐ │
│ │ len <varint> │ encoding <1 byte> │ data │ ... │ │ │ len <varint> │ encoding <1 byte> │ data │ ... │
│ └──────────────┴───────────────────┴────────┘ │ │ └──────────────┴───────────────────┴────────┘ │
└───────────────────────────────────────────────────┘ └───────────────────────────────────────────────────┘
``` ```

View file

@ -1,27 +1,35 @@
# Chunks Disk Format # Index Disk Format
The following describes the format of the `index` file found in each block directory. The following describes the format of the `index` file found in each block directory.
``` ```
┌────────────────────────────┬─────────────────────┐ ┌────────────────────────────┬─────────────────────┐
│ magic(0xBAAAD700) <4 byte> │ version(1) <1 byte> │ magic(0xBAAAD700) <4 byte> │ version(1) <1 byte>
├────────────────────────────┴─────────────────────┤ ├────────────────────────────┴─────────────────────┤
│ ┌──────────────────────────────────────────────┐ │ │ ┌──────────────────────────────────────────────┐ │
│ │ Symbol Table │ │ │ │ Symbol Table │ │
│ ├──────────────────────────────────────────────┤ │ │ ├──────────────────────────────────────────────┤ │
│ │ Series │ │ │ │ Series │ │
│ ├──────────────────────────────────────────────┤ │ │ ├──────────────────────────────────────────────┤ │
│ │ Label Index │ │ │ │ Label Index 1 │ │
│ ├──────────────────────────────────────────────┤ │ │ ├──────────────────────────────────────────────┤ │
│ │ Postings │ │ │ │ ... │ │
│ ├──────────────────────────────────────────────┤ │ │ ├──────────────────────────────────────────────┤ │
│ │ Body ... │ │ │ │ Label Index N │ │
│ ├──────────────────────────────────────────────┤ │ │ ├──────────────────────────────────────────────┤ │
│ │ Body ... │ │ │ │ Label Index Table │ │
│ ├──────────────────────────────────────────────┤ │ │ ├──────────────────────────────────────────────┤ │
│ │ Body ... │ │ │ │ Postings 1 │ │
│ └──────────────────────────────────────────────┘ │ │ ├──────────────────────────────────────────────┤ │
└──────────────────────────────────────────────────┘ │ │ ... │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings N │ │
│ ├──────────────────────────────────────────────┤ │
│ │ Postings Table │ │
│ ├──────────────────────────────────────────────┤ │
│ │ TOC │ │
│ └──────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
``` ```
@ -33,19 +41,19 @@ The section contains a sequence of the raw string data, each prefixed with the s
Strings are referenced by pointing to the beginning of their length field. The strings are sorted in lexicographically ascending order. Strings are referenced by pointing to the beginning of their length field. The strings are sorted in lexicographically ascending order.
``` ```
┌─────────────────────────┬───────────────┐ ┌─────────────────────────┬───────────────┐
│ count(symbols) <4 byte> │ len <4 byte> │ count(symbols) <4 byte> │ len <4 byte>
├─────────────────────────┴───────────────┤ ├─────────────────────────┴───────────────┤
│ ┌─────────────────────┬───────────────┐ │ │ ┌─────────────────────┬───────────────┐ │
│ │ len(str_1) <varint> │ str_1 <bytes> │ │ │ │ len(str_1) <varint> │ str_1 <bytes> │ │
│ ├─────────────────────┴───────────────┤ │ │ ├─────────────────────┴───────────────┤ │
│ │ . . . │ │ │ │ . . . │ │
│ ├─────────────────────┬───────────────┤ │ │ ├─────────────────────┬───────────────┤ │
│ │ len(str_n) <varint> │ str_1 <bytes> │ │ │ │ len(str_n) <varint> │ str_1 <bytes> │ │
│ └─────────────────────┴───────────────┘ │ │ └─────────────────────┴───────────────┘ │
├─────────────────────────────────────────┤ ├─────────────────────────────────────────┤
│ CRC32 <4 byte> │ CRC32 <4 byte>
└─────────────────────────────────────────┘ └─────────────────────────────────────────┘
``` ```
@ -55,45 +63,45 @@ The section contains a sequence of series that hold the label set of the series
The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets.
``` ```
┌───────────────────────────────────────┐ ┌───────────────────────────────────────┐
│ count(series) <4 byte> │ count(series) <4 byte>
├───────────────────────────────────────┤ ├───────────────────────────────────────┤
│ ┌───────────────────────────────────┐ │ │ ┌───────────────────────────────────┐ │
│ │ series_1 │ │ │ │ series_1 │ │
│ ├───────────────────────────────────┤ │ │ ├───────────────────────────────────┤ │
│ │ . . . │ │ │ │ . . . │ │
│ ├───────────────────────────────────┤ │ │ ├───────────────────────────────────┤ │
│ │ series_n │ │ │ │ series_n │ │
│ └───────────────────────────────────┘ │ │ └───────────────────────────────────┘ │
└───────────────────────────────────────┘ └───────────────────────────────────────┘
``` ```
Every series holds a list of label pairs and chunks. The label pairs reference the symbol table and the chunks an address in one of the block's chunk files. Every series holds a list of label pairs and chunks. The label pairs reference the symbol table and the chunks an address in one of the block's chunk files.
``` ```
┌─────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────┐
│ len <varint> │ len <varint>
├─────────────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────┐ │ │ ┌──────────────────┬──────────────────────────────────┐ │
│ │ │ ┌──────────────────────────┐ │ │ │ │ │ ┌──────────────────────────┐ │ │
│ │ │ │ ref(l_i.name) <varint> │ │ │ │ │ │ │ ref(l_i.name) <varint> │ │ │
│ │ #labels <varint> │ ├──────────────────────────┤ ... │ │ │ │ #labels <varint> │ ├──────────────────────────┤ ... │ │
│ │ │ │ ref(l_i.value) <varint> │ │ │ │ │ │ │ ref(l_i.value) <varint> │ │ │
│ │ │ └──────────────────────────┘ │ │ │ │ │ └──────────────────────────┘ │ │
│ ├──────────────────┼──────────────────────────────────┤ │ │ ├──────────────────┼──────────────────────────────────┤ │
│ │ │ ┌──────────────────────────┐ │ │ │ │ │ ┌──────────────────────────┐ │ │
│ │ │ │ c_i.mint <varint> │ │ │ │ │ │ │ c_i.mint <varint> │ │ │
│ │ │ ├──────────────────────────┤ │ │ │ │ │ ├──────────────────────────┤ │ │
│ │ │ │ c_i.maxt <varint> │ │ │ │ │ │ │ c_i.maxt <varint> │ │ │
│ │ #chunks <varint> │ ├──────────────────────────┤ ... │ │ │ │ #chunks <varint> │ ├──────────────────────────┤ ... │ │
│ │ │ │ ref(c_i.data) <varint> │ │ │ │ │ │ │ ref(c_i.data) <varint> │ │ │
│ │ │ ├──────────────────────────┤ │ │ │ │ │ ├──────────────────────────┤ │ │
│ │ │ │ crc32(c_i.data) <varint> │ │ │ │ │ │ │ crc32(c_i.data) <varint> │ │ │
│ │ │ └──────────────────────────┘ │ │ │ │ │ └──────────────────────────┘ │ │
│ └──────────────────┴──────────────────────────────────┘ │ │ └──────────────────┴──────────────────────────────────┘ │
├─────────────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────────────┤
│ CRC32 <4 byte> │ CRC32 <4 byte>
└─────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────┘
``` ```
The CRC checksum is calculated over the series contents of the index concatenated with the data of its chunks (with encoding byte, without length). The CRC checksum is calculated over the series contents of the index concatenated with the data of its chunks (with encoding byte, without length).
@ -104,43 +112,87 @@ The CRC checksum is calculated over the series contents of the index concatenate
The label index indexes holds lists of possible values for label names. A sequence of label index blocks follow on the series entries. The label index indexes holds lists of possible values for label names. A sequence of label index blocks follow on the series entries.
``` ```
┌─────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────┐
│ len <varint> │ len <varint>
├─────────────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────┐ │ │ ┌──────────────────┬──────────────────────────────────┐ │
│ │ │ ┌──────────────────────────┐ │ │ │ │ │ ┌──────────────────────────┐ │ │
│ │ │ │ ref(value[0]) <varint> │ │ │ │ │ │ │ ref(value[0]) <4 byte> │ │ │
│ │ │ ├──────────────────────────┤ │ │ │ │ │ ├──────────────────────────┤ │ │
│ │ n = len(names) │ │ ... │ ... │ │ │ │ n = len(names) │ │ ... │ ... │ │
│ │ <varint> │ ├──────────────────────────┤ │ │ │ │ <varint> │ ├──────────────────────────┤ │ │
│ │ │ │ ref(value[n]) <varint> │ │ │ │ │ │ │ ref(value[n]) <4 byte> │ │ │
│ │ │ └──────────────────────────┘ │ │ │ │ │ └──────────────────────────┘ │ │
│ └──────────────────┴──────────────────────────────────┘ │ │ └──────────────────┴──────────────────────────────────┘ │
├─────────────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────────────┤
│ CRC32 <4 byte> │ CRC32 <4 byte>
└─────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────┘
``` ```
The sequence of label index blocks is finalized by a lookup table pointing to the beginning of each label index block. It is simply a list of entries that are read into an in-memory hashmap when the index is loaded.
### Postings ### Postings
Postings are postings lists that map label pairs to series they occur in. Postings are postings lists that map label pairs to series they occur in.
``` ```
┌─────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────┐
│ len <varint> │ len <varint>
├─────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────┐ │ │ ┌─────────────────────────────────────────────┐ │
│ │ ref(series[0]) <4 byte> │ │ │ │ ref(series[0]) <4 byte> │ │
│ ├─────────────────────────────────────────────┤ │ │ ├─────────────────────────────────────────────┤ │
│ │ ... │ │ │ │ ... │ │
│ ├─────────────────────────────────────────────┤ │ │ ├─────────────────────────────────────────────┤ │
│ │ ref(series[n]) <4 byte> │ │ │ │ ref(series[n]) <4 byte> │ │
│ └─────────────────────────────────────────────┘ │ │ └─────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────┤ ├─────────────────────────────────────────────────┤
│ CRC32 <4 byte> │ CRC32 <4 byte>
└─────────────────────────────────────────────────┘ └─────────────────────────────────────────────────┘
```
### Offset Table
```
┌─────────────────────────┬───────────────┐
│ count(symbols) <4 byte> │ len <4 byte>
├─────────────────────────┴───────────────┤
│ ┌─────────────────────────────────────┐ │
│ │ n = len(strs) <varint> │ │
│ ├─────────────────────────────────────┤ │
│ │ len(strs[0]) │ │
│ ├─────────────────────────────────────┤ │
│ │ ... │ │
│ ├─────────────────────────────────────┤ │
│ │ strs[n] │ │
│ ├─────────────────────────────────────┤ │
│ │ offset <varint> │ │
│ └─────────────────────────────────────┘ │
│ . . . │
├─────────────────────────────────────────┤
│ CRC32 <4 byte>
└─────────────────────────────────────────┘
``` ```
###
### TOC
The table of contents serves as an entry point to the entire index. It's size is fixed.
```
┌─────────────────────────────────────────────┐
│ ref(symbols) <8 byte>
├─────────────────────────────────────────────┤
│ ref(series) <8 byte>
├─────────────────────────────────────────────┤
│ ref(label indices) <8 byte>
├─────────────────────────────────────────────┤
│ ref(label indices table) <8 byte>
├─────────────────────────────────────────────┤
│ ref(postings) <8 byte>
├─────────────────────────────────────────────┤
│ ref(postings table) <8 byte>
└─────────────────────────────────────────────┘
```

2
db.go
View file

@ -95,8 +95,6 @@ type Appender interface {
Rollback() error Rollback() error
} }
const sep = '\xff'
// DB handles reads and writes of time series falling into // DB handles reads and writes of time series falling into
// a hashed partition of a seriedb. // a hashed partition of a seriedb.
type DB struct { type DB struct {

437
index.go
View file

@ -24,6 +24,9 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
"unsafe"
"math"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -35,8 +38,6 @@ const (
MagicIndex = 0xBAAAD700 MagicIndex = 0xBAAAD700
indexFormatV1 = 1 indexFormatV1 = 1
indexSeriesFormatV1 = 1
) )
const compactionPageBytes = minSectorSize * 64 const compactionPageBytes = minSectorSize * 64
@ -104,15 +105,16 @@ type IndexWriter interface {
// indexWriter implements the IndexWriter interface for the standard // indexWriter implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type indexWriter struct { type indexWriter struct {
f *os.File f *os.File
fbuf *bufio.Writer fbuf *bufio.Writer
pos int pos uint64
toc indexTOC
stage indexWriterStage stage indexWriterStage
// Reusable memory. // Reusable memory.
buf1 encbuf buf1 encbuf
buf2 encbuf buf2 encbuf
b []byte
uint32s []uint32 uint32s []uint32
series map[uint32]*indexWriterSeries series map[uint32]*indexWriterSeries
@ -123,6 +125,15 @@ type indexWriter struct {
crc32 hash.Hash crc32 hash.Hash
} }
type indexTOC struct {
symbols uint64
series uint64
labelIndices uint64
labelIndicesTable uint64
postings uint64
postingsTable uint64
}
func newIndexWriter(dir string) (*indexWriter, error) { func newIndexWriter(dir string) (*indexWriter, error) {
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
if err != nil { if err != nil {
@ -145,7 +156,6 @@ func newIndexWriter(dir string) (*indexWriter, error) {
// Reusable memory. // Reusable memory.
buf1: encbuf{b: make([]byte, 0, 1<<22)}, buf1: encbuf{b: make([]byte, 0, 1<<22)},
buf2: encbuf{b: make([]byte, 0, 1<<22)}, buf2: encbuf{b: make([]byte, 0, 1<<22)},
b: make([]byte, 0, 1<<23),
uint32s: make([]uint32, 0, 1<<15), uint32s: make([]uint32, 0, 1<<15),
// Caches. // Caches.
@ -162,21 +172,75 @@ func newIndexWriter(dir string) (*indexWriter, error) {
func (w *indexWriter) write(bufs ...[]byte) error { func (w *indexWriter) write(bufs ...[]byte) error {
for _, b := range bufs { for _, b := range bufs {
n, err := w.fbuf.Write(b) n, err := w.fbuf.Write(b)
w.pos += n w.pos += uint64(n)
if err != nil { if err != nil {
return err return err
} }
// For now the index file must not grow beyond 4GiB. Some of the fixed-sized
// offset references in v1 are only 4 byte large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if w.pos > math.MaxUint32 {
return errors.Errorf("exceeding max size of 4GiB")
}
} }
return nil return nil
} }
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *indexWriter) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
}
if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
// Complete population stage by writing symbols and series.
if w.stage == idxStagePopulate {
w.toc.symbols = w.pos
if err := w.writeSymbols(); err != nil {
return err
}
w.toc.series = w.pos
if err := w.writeSeries(); err != nil {
return err
}
}
// Mark start of sections in table of contents.
switch s {
case idxStageLabelIndex:
w.toc.labelIndices = w.pos
case idxStagePostings:
w.toc.labelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err
}
w.toc.postings = w.pos
case idxStageDone:
w.toc.postingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
return err
}
if err := w.writeTOC(); err != nil {
return err
}
}
w.stage = s
return nil
}
func (w *indexWriter) writeMeta() error { func (w *indexWriter) writeMeta() error {
b := [5]byte{} w.buf1.reset()
w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV1)
binary.BigEndian.PutUint32(b[:4], MagicIndex) return w.write(w.buf1.get())
b[4] = flagStd
return w.write(b[:])
} }
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
@ -210,10 +274,12 @@ func (w *indexWriter) writeSymbols() error {
w.buf2.reset() w.buf2.reset()
for _, s := range symbols { for _, s := range symbols {
w.symbols[s] = uint32(w.pos + headerSize + w.buf2.len()) w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
w.buf2.putUvarint(len(s)) // NOTE: len(s) gives the number of runes, not the number of bytes.
w.buf2.putString(s) // Therefore the read-back length for strings with unicode characters will
// be off when not using putCstr.
w.buf2.putUvarintStr(s)
} }
w.buf1.putBE32int(len(symbols)) w.buf1.putBE32int(len(symbols))
@ -277,28 +343,6 @@ func (w *indexWriter) writeSeries() error {
return nil return nil
} }
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *indexWriter) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
}
if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
if w.stage == idxStagePopulate {
if err := w.writeSymbols(); err != nil {
return err
}
if err := w.writeSeries(); err != nil {
return err
}
w.stage = s
return nil
}
return nil
}
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if err := w.ensureStage(idxStageLabelIndex); err != nil { if err := w.ensureStage(idxStageLabelIndex); err != nil {
return errors.Wrap(err, "ensure stage") return errors.Wrap(err, "ensure stage")
@ -311,8 +355,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
sort.Sort(valt) sort.Sort(valt)
w.labelIndexes = append(w.labelIndexes, hashEntry{ w.labelIndexes = append(w.labelIndexes, hashEntry{
name: strings.Join(names, string(sep)), keys: names,
offset: uint32(w.pos), offset: w.pos,
}) })
w.buf2.reset() w.buf2.reset()
@ -331,16 +375,52 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
return errors.Wrap(err, "write label index") return errors.Wrap(err, "write label index")
} }
// writeOffsetTable writes a sequence of readable hash entries.
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
w.buf1.reset()
w.buf1.putBE32int(len(entries))
w.buf2.reset()
for _, e := range entries {
w.buf2.putUvarint(len(e.keys))
for _, k := range e.keys {
w.buf2.putUvarintStr(k)
}
w.buf2.putUvarint64(e.offset)
}
w.buf1.putBE32int(w.buf2.len())
w.buf2.putHash(w.crc32)
return w.write(w.buf1.get(), w.buf2.get())
}
const indexTOCLen = 6*8 + 4
func (w *indexWriter) writeTOC() error {
w.buf1.reset()
w.buf1.putBE64(w.toc.symbols)
w.buf1.putBE64(w.toc.series)
w.buf1.putBE64(w.toc.labelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable)
w.buf1.putBE64(w.toc.postings)
w.buf1.putBE64(w.toc.postingsTable)
w.buf1.putHash(w.crc32)
return w.write(w.buf1.get())
}
func (w *indexWriter) WritePostings(name, value string, it Postings) error { func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if err := w.ensureStage(idxStagePostings); err != nil { if err := w.ensureStage(idxStagePostings); err != nil {
return errors.Wrap(err, "ensure stage") return errors.Wrap(err, "ensure stage")
} }
key := name + string(sep) + value
w.postings = append(w.postings, hashEntry{ w.postings = append(w.postings, hashEntry{
name: key, keys: []string{name, value},
offset: uint32(w.pos), offset: w.pos,
}) })
// Order of the references in the postings list does not imply order // Order of the references in the postings list does not imply order
@ -382,62 +462,12 @@ func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
type hashEntry struct { type hashEntry struct {
name string keys []string
offset uint32 offset uint64
}
func (w *indexWriter) writeHashmap(h []hashEntry) error {
w.b = append(w.b[:0], flagStd, 0, 0, 0, 0)
buf := [binary.MaxVarintLen32]byte{}
for _, e := range h {
n := binary.PutUvarint(buf[:], uint64(len(e.name)))
w.b = append(w.b, buf[:n]...)
w.b = append(w.b, e.name...)
n = binary.PutUvarint(buf[:], uint64(e.offset))
w.b = append(w.b, buf[:n]...)
}
binary.BigEndian.PutUint32(w.b[1:], uint32(len(w.b)-5))
w.crc32.Reset()
if _, err := w.crc32.Write(w.b[5:]); err != nil {
return errors.Wrap(err, "calculate label index CRC32 checksum")
}
w.b = w.crc32.Sum(w.b)
return w.write(w.b)
}
func (w *indexWriter) finalize() error {
if err := w.ensureStage(idxStageDone); err != nil {
return errors.Wrap(err, "ensure stage")
}
// Write out hash maps to jump to correct label index and postings sections.
lo := uint32(w.pos)
if err := w.writeHashmap(w.labelIndexes); err != nil {
return err
}
po := uint32(w.pos)
if err := w.writeHashmap(w.postings); err != nil {
return err
}
// Terminate index file with offsets to hashmaps. This is the entry Pointer
// for any index query.
// TODO(fabxc): also store offset to series section to allow plain
// iteration over all existing series?
b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], lo)
binary.BigEndian.PutUint32(b[4:], po)
return w.write(b[:])
} }
func (w *indexWriter) Close() error { func (w *indexWriter) Close() error {
if err := w.finalize(); err != nil { if err := w.ensureStage(idxStageDone); err != nil {
return err return err
} }
if err := w.fbuf.Flush(); err != nil { if err := w.fbuf.Flush(); err != nil {
@ -478,7 +508,8 @@ type StringTuples interface {
type indexReader struct { type indexReader struct {
// The underlying byte slice holding the encoded series data. // The underlying byte slice holding the encoded series data.
b []byte b []byte
toc indexTOC
// Close that releases the underlying resources of the byte slice. // Close that releases the underlying resources of the byte slice.
c io.Closer c io.Closer
@ -509,57 +540,81 @@ func newIndexReader(dir string) (*indexReader, error) {
return nil, errors.Errorf("invalid magic number %x", m) return nil, errors.Errorf("invalid magic number %x", m)
} }
// The last two 4 bytes hold the pointers to the hashmaps. if err := r.readTOC(); err != nil {
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) return nil, errors.Wrap(err, "read TOC")
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) }
flag, b, err := r.section(loff) r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) return nil, errors.Wrap(err, "read label index table")
} }
if r.labels, err = readHashmap(flag, b); err != nil { r.postings, err = r.readOffsetTable(r.toc.postingsTable)
return nil, errors.Wrap(err, "read label index hashmap")
}
flag, b, err = r.section(poff)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) return nil, errors.Wrap(err, "read postings table")
}
if r.postings, err = readHashmap(flag, b); err != nil {
return nil, errors.Wrap(err, "read postings hashmap")
} }
return r, nil return r, nil
} }
func readHashmap(flag byte, b []byte) (map[string]uint32, error) { func (r *indexReader) readTOC() error {
if flag != flagStd { if len(r.b) < indexTOCLen {
return nil, errInvalidFlag return errInvalidSize
} }
h := make(map[string]uint32, 512) b := r.b[len(r.b)-indexTOCLen:]
for len(b) > 0 { r.toc.symbols = binary.BigEndian.Uint64(b[0:8])
l, n := binary.Uvarint(b) r.toc.series = binary.BigEndian.Uint64(b[8:16])
if n < 1 { r.toc.labelIndices = binary.BigEndian.Uint64(b[16:24])
return nil, errors.Wrap(errInvalidSize, "read key length") r.toc.labelIndicesTable = binary.BigEndian.Uint64(b[24:32])
} r.toc.postings = binary.BigEndian.Uint64(b[32:40])
b = b[n:] r.toc.postingsTable = binary.BigEndian.Uint64(b[40:48])
if len(b) < int(l) { // TODO(fabxc): validate checksum.
return nil, errors.Wrap(errInvalidSize, "read key")
}
s := string(b[:l])
b = b[l:]
o, n := binary.Uvarint(b) return nil
if n < 1 { }
return nil, errors.Wrap(errInvalidSize, "read offset value")
}
b = b[n:]
h[s] = uint32(o) func (r *indexReader) decbufAt(off int) decbuf {
if len(r.b) < off {
return decbuf{e: errInvalidSize}
}
return decbuf{b: r.b[off:]}
}
// readOffsetTable reads an offset table at the given position and returns a map
// with the key strings concatenated by the 0xff unicode non-character.
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
// A table might not have been written at all, in which case the position
// is zeroed out.
if off == 0 {
return nil, nil
} }
return h, nil const sep = "\xff"
var (
d1 = r.decbufAt(int(off))
cnt = d1.readBE32()
el = d1.readBE32()
d2 = d1.get(int(el))
)
res := make(map[string]uint32, 512)
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
keyCount := int(d2.readUvarint())
keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ {
keys = append(keys, d2.readUvarintStr())
}
res[strings.Join(keys, sep)] = uint32(d2.readUvarint())
cnt--
}
// TODO(fabxc): verify checksum from remainer of d1.
return res, d2.err()
} }
func (r *indexReader) Close() error { func (r *indexReader) Close() error {
@ -619,7 +674,9 @@ func (r *indexReader) getSized(off uint32) ([]byte, error) {
} }
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
key := strings.Join(names, string(sep)) const sep = "\xff"
key := strings.Join(names, sep)
off, ok := r.labels[key] off, ok := r.labels[key]
if !ok { if !ok {
// XXX(fabxc): hot fix. Should return a partial data error and handle cases // XXX(fabxc): hot fix. Should return a partial data error and handle cases
@ -652,6 +709,8 @@ func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 } func (emptyStringTuples) Len() int { return 0 }
func (r *indexReader) LabelIndices() ([][]string, error) { func (r *indexReader) LabelIndices() ([][]string, error) {
const sep = "\xff"
res := [][]string{} res := [][]string{}
for s := range r.labels { for s := range r.labels {
@ -744,6 +803,8 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
} }
func (r *indexReader) Postings(name, value string) (Postings, error) { func (r *indexReader) Postings(name, value string) (Postings, error) {
const sep = "\xff"
key := name + string(sep) + value key := name + string(sep) + value
off, ok := r.postings[key] off, ok := r.postings[key]
@ -832,30 +893,33 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
return res, nil return res, nil
} }
// enbuf is a helper type to populate a byte slice with various types.
type encbuf struct { type encbuf struct {
b []byte b []byte
c [binary.MaxVarintLen64]byte c [binary.MaxVarintLen64]byte
} }
func (e *encbuf) reset() { func (e *encbuf) reset() { e.b = e.b[:0] }
e.b = e.b[:0] func (e *encbuf) get() []byte { return e.b }
} func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) { func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x) binary.BigEndian.PutUint32(e.c[:], x)
e.b = append(e.b, e.c[:4]...) e.b = append(e.b, e.c[:4]...)
} }
func (e *encbuf) putBE32int(x int) { func (e *encbuf) putBE64(x uint64) {
e.putBE32(uint32(x)) binary.BigEndian.PutUint64(e.c[:], x)
} e.b = append(e.b, e.c[:8]...)
func (e *encbuf) putUvarint32(x uint32) {
e.putUvarint64(uint64(x))
}
func (e *encbuf) putUvarint(x int) {
e.putUvarint64(uint64(x))
} }
func (e *encbuf) putUvarint64(x uint64) { func (e *encbuf) putUvarint64(x uint64) {
@ -868,27 +932,84 @@ func (e *encbuf) putVarint64(x int64) {
e.b = append(e.b, e.c[:n]...) e.b = append(e.b, e.c[:n]...)
} }
func (e *encbuf) putString(s string) { // putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
e.b = append(e.b, s...) func (e *encbuf) putUvarintStr(s string) {
} b := *(*[]byte)(unsafe.Pointer(&s))
e.putUvarint(len(b))
func (e *encbuf) putBytes(b []byte) { e.putString(s)
e.b = append(e.b, b...)
} }
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) { func (e *encbuf) putHash(h hash.Hash) {
h.Reset() h.Reset()
_, err := h.Write(e.b) _, err := h.Write(e.b)
if err != nil { if err != nil {
panic(err) // The CRC32 implementation does not error panic(err) // The CRC32 implementation does not error
} }
h.Sum(e.b) e.b = h.Sum(e.b)
} }
func (e *encbuf) get() []byte { type decbuf struct {
return e.b b []byte
e error
} }
func (e *encbuf) len() int { func (d *decbuf) readUvarintStr() string {
return len(e.b) l := d.readUvarint()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := string(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) readUvarint() uint64 {
if d.e != nil {
return 0
}
x, n := binary.Uvarint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) readBE32() uint32 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint32(d.b)
d.b = d.b[4:]
return x
}
func (d *decbuf) get(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error {
return d.e
}
func (d *decbuf) len() int {
return len(d.b)
} }