Don't buffer up tables in memory on compaction. (#6422)

We can instead write it as we go, and then go back and write in the
length at the end.

Also fix the compaction benchmark, which indicates no changes.

For the benchmark, this brings maximum memory usage of the buffers 
from ~200kB down to 128B.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-12-11 12:49:13 +00:00 committed by GitHub
parent 83cda784d2
commit 4df814f509
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 165 additions and 47 deletions

View file

@ -852,8 +852,10 @@ func BenchmarkCompaction(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err = c.Compact(dir, blockDirs, blocks)
testutil.Ok(b, err)
}
})
}
}

View file

@ -75,10 +75,20 @@ func (e *Encbuf) PutUvarintStr(s string) {
// PutHash appends a hash over the buffers current contents to the buffer.
func (e *Encbuf) PutHash(h hash.Hash) {
h.Reset()
e.WriteToHash(h)
e.PutHashSum(h)
}
// WriteToHash writes the current buffer contents to the given hash.
func (e *Encbuf) WriteToHash(h hash.Hash) {
_, err := h.Write(e.B)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
}
// PutHashSum writes the Sum of the given hash to the buffer.
func (e *Encbuf) PutHashSum(h hash.Hash) {
e.B = h.Sum(e.B)
}

View file

@ -236,6 +236,14 @@ func (w *Writer) write(bufs ...[]byte) error {
return nil
}
func (w *Writer) writeAt(buf []byte, pos uint64) error {
if err := w.fbuf.Flush(); err != nil {
return err
}
_, err := w.f.WriteAt(buf, int64(pos))
return err
}
// addPadding adds zero byte padding until the file size is a multiple size.
func (w *Writer) addPadding(size int) error {
p := w.pos % uint64(size)
@ -382,23 +390,42 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
}
sort.Strings(symbols)
w.buf1.Reset()
w.buf2.Reset()
startPos := w.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
w.crc32.Reset()
w.buf2.PutBE32int(len(symbols))
w.buf1.Reset()
w.buf1.PutBE32int(len(symbols))
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
w.symbols = make(map[string]uint32, len(symbols))
for index, s := range symbols {
w.symbols[s] = uint32(index)
w.buf2.PutUvarintStr(s)
w.buf1.Reset()
w.buf1.PutUvarintStr(s)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
}
w.buf1.PutBE32int(w.buf2.Len())
w.buf2.PutHash(w.crc32)
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
err := w.write(w.buf1.Get(), w.buf2.Get())
return errors.Wrap(err, "write symbols")
w.buf1.Reset()
w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get())
}
func (w *Writer) WriteLabelIndex(names []string, values []string) error {
@ -425,9 +452,20 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
offset: w.pos,
})
w.buf2.Reset()
w.buf2.PutBE32int(len(names))
w.buf2.PutBE32int(valt.Len())
startPos := w.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
w.crc32.Reset()
w.buf1.Reset()
w.buf1.PutBE32int(len(names))
w.buf1.PutBE32int(valt.Len())
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
// here we have an index for the symbol file if v2, otherwise it's an offset
for _, v := range valt.entries {
@ -435,55 +473,104 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
if !ok {
return errors.Errorf("symbol entry for %q does not exist", v)
}
w.buf2.PutBE32(index)
w.buf1.Reset()
w.buf1.PutBE32(index)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
}
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
w.buf2.PutHash(w.crc32)
err = w.write(w.buf1.Get(), w.buf2.Get())
return errors.Wrap(err, "write label index")
w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get())
}
// writeLabelIndexesOffsetTable writes the label indices offset table.
func (w *Writer) writeLabelIndexesOffsetTable() error {
w.buf2.Reset()
w.buf2.PutBE32int(len(w.labelIndexes))
startPos := w.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
w.crc32.Reset()
w.buf1.Reset()
w.buf1.PutBE32int(len(w.labelIndexes))
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
for _, e := range w.labelIndexes {
w.buf2.PutUvarint(len(e.keys))
w.buf1.Reset()
w.buf1.PutUvarint(len(e.keys))
for _, k := range e.keys {
w.buf2.PutUvarintStr(k)
w.buf1.PutUvarintStr(k)
}
w.buf2.PutUvarint64(e.offset)
w.buf1.PutUvarint64(e.offset)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
}
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
w.buf2.PutHash(w.crc32)
return w.write(w.buf1.Get(), w.buf2.Get())
w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get())
}
// writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error {
w.buf2.Reset()
w.buf2.PutBE32int(len(w.postings))
startPos := w.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
w.crc32.Reset()
w.buf1.Reset()
w.buf1.PutBE32int(len(w.postings))
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
for _, e := range w.postings {
w.buf2.PutUvarint(2)
w.buf2.PutUvarintStr(e.name)
w.buf2.PutUvarintStr(e.value)
w.buf2.PutUvarint64(e.offset)
w.buf1.Reset()
w.buf1.PutUvarint(2)
w.buf1.PutUvarintStr(e.name)
w.buf1.PutUvarintStr(e.value)
w.buf1.PutUvarint64(e.offset)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
}
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
w.buf2.PutHash(w.crc32)
return w.write(w.buf1.Get(), w.buf2.Get())
w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get())
}
const indexTOCLen = 6*8 + 4
@ -539,21 +626,40 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
}
sort.Sort(uint32slice(refs))
w.buf2.Reset()
w.buf2.PutBE32int(len(refs))
startPos := w.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
w.crc32.Reset()
w.buf1.Reset()
w.buf1.PutBE32int(len(refs))
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
for _, r := range refs {
w.buf2.PutBE32(r)
w.buf1.Reset()
w.buf1.PutBE32(r)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
}
w.uint32s = refs
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(w.buf2.Len())
w.buf1.PutBE32int(int(w.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
w.buf2.PutHash(w.crc32)
err := w.write(w.buf1.Get(), w.buf2.Get())
return errors.Wrap(err, "write postings")
w.buf1.Reset()
w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get())
}
type uint32slice []uint32