Simplify label value index

This removes the flag from the label value index and serializes it using
encbufs.
TODO: move CRC32 checksum into label value index hash table for
referntial integrity.
This commit is contained in:
Fabian Reinartz 2017-04-25 19:01:25 +02:00
parent d30b181406
commit 0aad526d1a
2 changed files with 51 additions and 36 deletions

View file

@ -101,7 +101,25 @@ The CRC checksum is calculated over the series contents of the index concatenate
### Label Index ### Label Index
The label index indexes holds lists of possible values for label names. The label index indexes holds lists of possible values for label names. A sequence of label index blocks follow on the series entries.
```
┌─────────────────────────────────────────────────────────┐
│ len <varint>
├─────────────────────────────────────────────────────────┤
│ ┌──────────────────┬──────────────────────────────────┐ │
│ │ │ ┌──────────────────────────┐ │ │
│ │ │ │ ref(value[0]) <varint> │ │ │
│ │ │ ├──────────────────────────┤ │ │
│ │ n = len(names) │ │ ... │ ... │ │
│ │ <varint> │ ├──────────────────────────┤ │ │
│ │ │ │ ref(value[n]) <varint> │ │ │
│ │ │ └──────────────────────────┘ │ │
│ └──────────────────┴──────────────────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│ CRC32 <4 byte>
└─────────────────────────────────────────────────────────┘
```
### Postings ### Postings

View file

@ -186,7 +186,6 @@ func (w *indexWriter) writeSymbols() error {
w.buf1.putBE32int(len(symbols)) w.buf1.putBE32int(len(symbols))
w.buf1.putBE32int(w.buf2.len()) w.buf1.putBE32int(w.buf2.len())
w.crc32.Reset()
w.buf2.putHash(w.crc32) w.buf2.putHash(w.crc32)
err := w.write(w.buf1.get(), w.buf2.get()) err := w.write(w.buf1.get(), w.buf2.get())
@ -244,7 +243,6 @@ func (w *indexWriter) writeSeries() error {
w.buf1.reset() w.buf1.reset()
w.buf1.putUvarint(w.buf2.len()) w.buf1.putUvarint(w.buf2.len())
w.crc32.Reset()
w.buf2.putHash(w.crc32) w.buf2.putHash(w.crc32)
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
@ -267,11 +265,16 @@ func (w *indexWriter) init() error {
return nil return nil
} }
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { func (w *indexWriter) ensureStarted() error {
if !w.started { if w.started {
if err := w.init(); err != nil { return nil
return err
} }
return w.init()
}
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if err := w.ensureStarted(); err != nil {
return errors.Wrap(err, "initialize")
} }
valt, err := newStringTuples(values, len(names)) valt, err := newStringTuples(values, len(names))
@ -285,35 +288,25 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
offset: uint32(w.pos), offset: uint32(w.pos),
}) })
buf := make([]byte, binary.MaxVarintLen32) w.buf2.reset()
n := binary.PutUvarint(buf, uint64(len(names))) w.buf2.putUvarint(len(names))
l := n + len(values)*4
w.b = append(w.b[:0], flagStd, 0, 0, 0, 0)
binary.BigEndian.PutUint32(w.b[1:], uint32(l))
w.b = append(w.b, buf[:n]...)
for _, v := range valt.s { for _, v := range valt.s {
binary.BigEndian.PutUint32(buf, w.symbols[v]) w.buf2.putBE32(w.symbols[v])
w.b = append(w.b, buf[:4]...)
} }
w.crc32.Reset() w.buf1.reset()
if _, err := w.crc32.Write(w.b[5:]); err != nil { w.buf1.putUvarint(w.buf2.len())
return errors.Wrap(err, "calculate label index CRC32 checksum")
}
w.b = w.crc32.Sum(w.b)
return w.write(w.b) w.buf2.putHash(w.crc32)
err = w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write label index")
} }
func (w *indexWriter) WritePostings(name, value string, it Postings) error { func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if !w.started { if err := w.ensureStarted(); err != nil {
if err := w.init(); err != nil { return errors.Wrap(err, "initialize")
return err
}
} }
key := name + string(sep) + value key := name + string(sep) + value
@ -597,20 +590,23 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist") //return nil, fmt.Errorf("label index doesn't exist")
} }
flag, b, err := r.section(off) b := r.b[off:]
if err != nil {
return nil, errors.Wrapf(err, "section at %d", off)
}
if flag != flagStd {
return nil, errInvalidFlag
}
l, n := binary.Uvarint(b) l, n := binary.Uvarint(b)
if n < 0 {
return nil, errors.New("reading symbol length failed")
}
if int(l) > len(b[n:]) {
return nil, errInvalidSize
}
b = b[n : n+int(l)]
c, n := binary.Uvarint(b)
if n < 1 { if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read label index size") return nil, errors.Wrap(errInvalidSize, "read label index size")
} }
st := &serializedStringTuples{ st := &serializedStringTuples{
l: int(l), l: int(c),
b: b[n:], b: b[n:],
lookup: r.lookupSymbol, lookup: r.lookupSymbol,
} }
@ -853,6 +849,7 @@ func (e *encbuf) putBytes(b []byte) {
} }
func (e *encbuf) putHash(h hash.Hash) { func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b) _, err := h.Write(e.b)
if err != nil { if err != nil {
panic(err) // The CRC32 implementation does not error panic(err) // The CRC32 implementation does not error