During compaction spool postings offset table on the side.

This avoids building it up in memory.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-12-17 21:16:56 +00:00
parent 31700a05df
commit 85964ce567

View file

@ -116,6 +116,12 @@ type Writer struct {
fbuf *bufio.Writer fbuf *bufio.Writer
pos uint64 pos uint64
// Temporary file for posting offsets table.
fPO *os.File
fbufPO *bufio.Writer
posPO uint64
cntPO uint64
toc TOC toc TOC
stage indexWriterStage stage indexWriterStage
@ -193,7 +199,13 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return nil, errors.Wrap(err, "remove any existing index at path") return nil, errors.Wrap(err, "remove any existing index at path")
} }
f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) // Main index file we are building.
f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
// Temporary file for posting offset table.
fPO, err := os.OpenFile(fn+"_tmp_po", os.O_CREATE|os.O_RDWR, 0666)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -202,11 +214,14 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
} }
iw := &Writer{ iw := &Writer{
ctx: ctx, ctx: ctx,
f: f, f: f,
fbuf: bufio.NewWriterSize(f, 1<<22), fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0, pos: 0,
stage: idxStageNone, fPO: fPO,
fbufPO: bufio.NewWriterSize(fPO, 1<<22),
posPO: 0,
stage: idxStageNone,
// Reusable memory. // Reusable memory.
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
@ -564,39 +579,60 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
// writePostingsOffsetTable writes the postings offset table. // writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error { func (w *Writer) writePostingsOffsetTable() error {
startPos := w.pos // Ensure everything is in the temporary file.
// Leave 4 bytes of space for the length, which will be calculated later. if err := w.fbufPO.Flush(); err != nil {
if err := w.write([]byte("alen")); err != nil {
return err return err
} }
w.crc32.Reset()
w.buf1.Reset() w.buf1.Reset()
w.buf1.PutBE32int(len(w.postings)) w.buf1.PutBE32int(int(w.posPO) + 4) // Length, including the count.
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil { if err := w.write(w.buf1.Get()); err != nil {
return err return err
} }
for _, e := range w.postings {
w.buf1.Reset()
w.buf1.PutUvarint(2)
w.buf1.PutUvarintStr(e.name)
w.buf1.PutUvarintStr(e.value)
w.buf1.PutUvarint64(e.offset)
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
}
// Write out the length.
w.buf1.Reset() w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4)) w.crc32.Reset()
if err := w.writeAt(w.buf1.Get(), startPos); err != nil { w.buf1.PutBE32int(int(w.cntPO)) // Count.
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
// Copy temporary file into main index.
if _, err := w.fPO.Seek(0, 0); err != nil {
return err return err
} }
buf := make([]byte, 1<<20)
l := 0
for {
n, err := w.fPO.Read(buf)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
l += n
w.crc32.Write(buf[:n])
if err := w.write(buf[:n]); err != nil {
return err
}
}
if w.posPO != uint64(l) {
return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.posPO, l)
}
// Cleanup temporary file.
name := w.fPO.Name()
if err := w.fPO.Close(); err != nil {
return err
}
w.fPO = nil
if err := os.Remove(name); err != nil {
return err
}
// Finally write the hash.
w.buf1.Reset() w.buf1.Reset()
w.buf1.PutHashSum(w.crc32) w.buf1.PutHashSum(w.crc32)
return w.write(w.buf1.Get()) return w.write(w.buf1.Get())
@ -749,11 +785,18 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
return err return err
} }
w.postings = append(w.postings, postingsHashEntry{ // Write out postings offset table to temporary file as we go.
name: name, w.buf1.Reset()
value: value, w.buf1.PutUvarint(2)
offset: w.pos, w.buf1.PutUvarintStr(name)
}) w.buf1.PutUvarintStr(value)
w.buf1.PutUvarint64(w.pos)
if n, err := w.fbufPO.Write(w.buf1.Get()); err != nil {
return err
} else {
w.posPO += uint64(n)
}
w.cntPO++
w.buf1.Reset() w.buf1.Reset()
w.buf1.PutBE32int(len(offs)) w.buf1.PutBE32int(len(offs))
@ -796,6 +839,11 @@ func (w *Writer) Close() error {
return err return err
} }
} }
if w.fPO != nil {
if err := w.fPO.Close(); err != nil {
return err
}
}
if err := w.fbuf.Flush(); err != nil { if err := w.fbuf.Flush(); err != nil {
return err return err
} }