Write label indices based on the posting offset table.

This avoids having to build it up in RAM, and means that all variable
memory usage for compactions is now 0.25 bytes per symbol plus a few
O(labelnames) structures. So in practice, pretty close to constant
memory for compactions.

benchmark                                                                                   old ns/op       new ns/op       delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4        662974828       667162981       +0.63%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4       2459590377      2131168138      -13.35%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4       3808280548      3919290378      +2.91%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4       8513884311      8738099339      +2.63%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4      1898843003      1944131966      +2.39%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4     5601478437      6031391658      +7.67%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4     11225096097     11359624463     +1.20%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4     23994637282     23919583343     -0.31%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4                               891042098       826898358       -7.20%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4                               915949138       902555676       -1.46%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4                               955138431       879067946       -7.96%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4                               991447640       958785968       -3.29%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4                               1068729356      980249080       -8.28%

benchmark                                                                                   old allocs     new allocs     delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4        470778         470556         -0.05%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4       791429         791225         -0.03%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4       1111514        1111257        -0.02%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4       2111498        2111369        -0.01%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4      841433         841220         -0.03%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4     1911469        1911202        -0.01%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4     3041558        3041328        -0.01%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4     6741534        6741382        -0.00%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4                               824856         820873         -0.48%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4                               887220         885180         -0.23%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4                               905253         901539         -0.41%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4                               925148         913632         -1.24%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4                               1019141        978727         -3.97%

benchmark                                                                                   old bytes      new bytes      delta
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4        35694744       41523836       +16.33%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4       53405264       59499056       +11.41%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4       74160320       78151568       +5.38%
BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4       120878480      135364672      +11.98%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4      203832448      209925504      +2.99%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4     341029208      346551064      +1.62%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4     580217176      582345224      +0.37%
BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4     1356872288     1363495368     +0.49%
BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4                               119535672      94815920       -20.68%
BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4                               115352280      95980776       -16.79%
BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4                               119472320      98724460       -17.37%
BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4                               111979312      94325456       -15.77%
BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4                               116628584      98566344       -15.49%

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-12-18 01:29:41 +00:00
parent 7d1aad46b8
commit 2b653ee230

View file

@ -15,6 +15,7 @@ package index
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"hash" "hash"
@ -116,8 +117,9 @@ type Writer struct {
fPO *fileWriter fPO *fileWriter
cntPO uint64 cntPO uint64
toc TOC toc TOC
stage indexWriterStage stage indexWriterStage
postingsStart uint64 // Due to padding, can differ from TOC entry.
// Reusable memory. // Reusable memory.
buf1 encoding.Encbuf buf1 encoding.Encbuf
@ -128,9 +130,8 @@ type Writer struct {
symbolFile *fileutil.MmapFile symbolFile *fileutil.MmapFile
lastSymbol string lastSymbol string
labelIndexes []labelIndexHashEntry // Label index offsets. labelIndexes []labelIndexHashEntry // Label index offsets.
labelValues map[string]map[uint32]struct{} // Label names, and their values's symbol indexes. labelNames map[string]uint64 // Label names, and their usage.
labelNames map[string]uint64 // Label names, and their usage.
// Hold last series to validate that clients insert new series in order. // Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels lastSeries labels.Labels
@ -223,9 +224,8 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
labelNames: make(map[string]uint64, 1<<8), labelNames: make(map[string]uint64, 1<<8),
labelValues: make(map[string]map[uint32]struct{}, 1<<8), crc32: newCRC32(),
crc32: newCRC32(),
} }
if err := iw.writeMeta(); err != nil { if err := iw.writeMeta(); err != nil {
return nil, err return nil, err
@ -277,7 +277,7 @@ func (fw *fileWriter) write(bufs ...[]byte) error {
// Once we move to compressed/varint representations in those areas, this limitation // Once we move to compressed/varint representations in those areas, this limitation
// can be lifted. // can be lifted.
if fw.pos > 16*math.MaxUint32 { if fw.pos > 16*math.MaxUint32 {
return errors.Errorf("exceeding max size of 64GiB") return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
} }
} }
return nil return nil
@ -331,9 +331,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
if w.stage == s { if w.stage == s {
return nil return nil
} }
if w.stage+1 < s { if w.stage < s-1 {
// A stage has been skipped. // A stage has been skipped.
w.ensureStage(s - 1) if err := w.ensureStage(s - 1); err != nil {
return err
}
} }
if w.stage > s { if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
@ -420,7 +422,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
w.buf2.PutUvarint(len(lset)) w.buf2.PutUvarint(len(lset))
for _, l := range lset { for _, l := range lset {
// here we have an index for the symbol file if v2, otherwise it's an offset
index, err := w.symbols.ReverseLookup(l.Name) index, err := w.symbols.ReverseLookup(l.Name)
if err != nil { if err != nil {
return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err) return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
@ -433,11 +434,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
} }
w.buf2.PutUvarint32(index) w.buf2.PutUvarint32(index)
if _, ok := w.labelValues[l.Name]; !ok {
w.labelValues[l.Name] = map[uint32]struct{}{}
}
w.labelValues[l.Name][index] = struct{}{}
} }
w.buf2.PutUvarint(len(chunks)) w.buf2.PutUvarint(len(chunks))
@ -536,19 +532,52 @@ func (w *Writer) finishSymbols() error {
} }
func (w *Writer) writeLabelIndices() error { func (w *Writer) writeLabelIndices() error {
names := make([]string, 0, len(w.labelValues)) if err := w.fPO.flush(); err != nil {
for n := range w.labelValues { return err
names = append(names, n)
} }
sort.Strings(names)
for _, n := range names { // Find all the label values in the tmp posting offset table.
values := make([]uint32, 0, len(w.labelValues[n])) f, err := fileutil.OpenMmapFile(w.fPO.name)
for v := range w.labelValues[n] { if err != nil {
values = append(values, v) return err
}
defer f.Close()
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
cnt := w.cntPO
current := []byte{}
values := []uint32{}
for d.Err() == nil && cnt > 0 {
cnt--
d.Uvarint() // Keycount.
name := d.UvarintBytes() // Label name.
value := yoloString(d.UvarintBytes()) // Label value.
d.Uvarint64() // Offset.
if len(name) == 0 {
continue // All index is ignored.
} }
sort.Sort(uint32slice(values))
if err := w.writeLabelIndex(n, values); err != nil { if !bytes.Equal(name, current) && len(values) > 0 {
// We've reached a new label name.
if err := w.writeLabelIndex(string(current), values); err != nil {
return err
}
values = values[:0]
}
current = name
sid, err := w.symbols.ReverseLookup(value)
if err != nil {
return err
}
values = append(values, sid)
}
if d.Err() != nil {
return d.Err()
}
// Handle the last label.
if len(values) > 0 {
if err := w.writeLabelIndex(string(current), values); err != nil {
return err return err
} }
} }
@ -657,7 +686,7 @@ func (w *Writer) writePostingsOffsetTable() error {
// Copy over the tmp posting offset table, however we need to // Copy over the tmp posting offset table, however we need to
// adjust the offsets. // adjust the offsets.
adjustment := w.toc.Postings adjustment := w.postingsStart
w.buf1.Reset() w.buf1.Reset()
w.crc32.Reset() w.crc32.Reset()
@ -888,10 +917,11 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
} }
func (w *Writer) writePostings() error { func (w *Writer) writePostings() error {
// There's padding in the tmp filem make sure it actually works. // There's padding in the tmp file, make sure it actually works.
if err := w.f.addPadding(4); err != nil { if err := w.f.addPadding(4); err != nil {
return err return err
} }
w.postingsStart = w.f.pos
// Copy temporary file into main index. // Copy temporary file into main index.
if err := w.fP.flush(); err != nil { if err := w.fP.flush(); err != nil {
@ -910,6 +940,9 @@ func (w *Writer) writePostings() error {
} }
w.f.pos += uint64(n) w.f.pos += uint64(n)
if err := w.fP.close(); err != nil {
return err
}
if err := w.fP.remove(); err != nil { if err := w.fP.remove(); err != nil {
return err return err
} }
@ -928,11 +961,6 @@ type labelIndexHashEntry struct {
offset uint64 offset uint64
} }
type postingsHashEntry struct {
name, value string
offset uint64
}
func (w *Writer) Close() error { func (w *Writer) Close() error {
if err := w.ensureStage(idxStageDone); err != nil { if err := w.ensureStage(idxStageDone); err != nil {
return err return err