Put postings in a temporary file during index writing.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-12-18 00:55:29 +00:00
parent dee6981a6c
commit 7d1aad46b8

View file

@ -110,6 +110,8 @@ type Writer struct {
// For the main index file.
f *fileWriter
// Temporary file for postings.
fP *fileWriter
// Temporary file for posting offsets table.
fPO *fileWriter
cntPO uint64
@ -196,6 +198,11 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
if err != nil {
return nil, err
}
// Temporary file for postings.
fP, err := newFileWriter(fn + "_tmp_p")
if err != nil {
return nil, err
}
// Temporary file for posting offset table.
fPO, err := newFileWriter(fn + "_tmp_po")
if err != nil {
@ -208,6 +215,7 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
iw := &Writer{
ctx: ctx,
f: f,
fP: fP,
fPO: fPO,
stage: idxStageNone,
@ -323,6 +331,10 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
}
if w.stage+1 < s {
// A stage has been skipped.
w.ensureStage(s - 1)
}
if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
@ -342,6 +354,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
case idxStageDone:
w.toc.LabelIndices = w.f.pos
// LabelIndices generation depends on the posting offset
// table produced at this stage.
if err := w.writePostingsToTmpFiles(); err != nil {
return err
}
if err := w.writeLabelIndices(); err != nil {
return err
}
@ -355,6 +372,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
if err := w.writeLabelIndexesOffsetTable(); err != nil {
return err
}
w.toc.PostingsTable = w.f.pos
if err := w.writePostingsOffsetTable(); err != nil {
return err
@ -631,12 +649,16 @@ func (w *Writer) writePostingsOffsetTable() error {
return err
}
w.buf1.Reset()
w.buf1.PutBE32int(int(w.fPO.pos) + 4) // Length, including the count.
if err := w.write(w.buf1.Get()); err != nil {
startPos := w.f.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
}
// Copy over the tmp posting offset table, however we need to
// adjust the offsets.
adjustment := w.toc.Postings
w.buf1.Reset()
w.crc32.Reset()
w.buf1.PutBE32int(int(w.cntPO)) // Count.
@ -644,29 +666,28 @@ func (w *Writer) writePostingsOffsetTable() error {
if err := w.write(w.buf1.Get()); err != nil {
return err
}
// Copy temporary file into main index.
if _, err := w.fPO.f.Seek(0, 0); err != nil {
f, err := fileutil.OpenMmapFile(w.fPO.name)
if err != nil {
return err
}
buf := make([]byte, 1<<20)
l := 0
for {
n, err := w.fPO.f.Read(buf)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
l += n
w.crc32.Write(buf[:n])
if err := w.write(buf[:n]); err != nil {
defer f.Close()
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
cnt := w.cntPO
for d.Err() == nil && cnt > 0 {
w.buf1.Reset()
w.buf1.PutUvarint(d.Uvarint()) // Keycount.
w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name.
w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value.
w.buf1.PutUvarint64(d.Uvarint64() + adjustment) // Offset.
w.buf1.WriteToHash(w.crc32)
if err := w.write(w.buf1.Get()); err != nil {
return err
}
cnt--
}
if w.fPO.pos != uint64(l) {
return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.fPO.pos, l)
if d.Err() != nil {
return d.Err()
}
// Cleanup temporary file.
@ -678,6 +699,13 @@ func (w *Writer) writePostingsOffsetTable() error {
}
w.fPO = nil
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
// Finally write the hash.
w.buf1.Reset()
w.buf1.PutHashSum(w.crc32)
@ -701,7 +729,7 @@ func (w *Writer) writeTOC() error {
return w.write(w.buf1.Get())
}
func (w *Writer) writePostings() error {
func (w *Writer) writePostingsToTmpFiles() error {
names := make([]string, 0, len(w.labelNames))
for n := range w.labelNames {
names = append(names, n)
@ -729,9 +757,10 @@ func (w *Writer) writePostings() error {
}
offsets = append(offsets, uint32(startPos/16))
// Skip to next series.
d.Skip(d.Uvarint() + crc32.Size)
x := d.Uvarint()
d.Skip(x + crc32.Size)
if err := d.Err(); err != nil {
return nil
return err
}
}
if err := w.writePosting("", "", offsets); err != nil {
@ -827,7 +856,7 @@ func (w *Writer) writePostings() error {
func (w *Writer) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.addPadding(4); err != nil {
if err := w.fP.addPadding(4); err != nil {
return err
}
@ -836,7 +865,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf1.PutUvarint(2)
w.buf1.PutUvarintStr(name)
w.buf1.PutUvarintStr(value)
w.buf1.PutUvarint64(w.f.pos)
w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file.
if err := w.fPO.write(w.buf1.Get()); err != nil {
return err
}
@ -855,7 +884,37 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf2.Reset()
w.buf2.PutBE32int(w.buf1.Len())
w.buf1.PutHash(w.crc32)
return w.write(w.buf2.Get(), w.buf1.Get())
return w.fP.write(w.buf2.Get(), w.buf1.Get())
}
func (w *Writer) writePostings() error {
// There's padding in the tmp filem make sure it actually works.
if err := w.f.addPadding(4); err != nil {
return err
}
// Copy temporary file into main index.
if err := w.fP.flush(); err != nil {
return err
}
if _, err := w.fP.f.Seek(0, 0); err != nil {
return err
}
// Don't need to calculate a checksum, so can copy directly.
n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20))
if err != nil {
return err
}
if uint64(n) != w.fP.pos {
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
}
w.f.pos += uint64(n)
if err := w.fP.remove(); err != nil {
return err
}
w.fP = nil
return nil
}
type uint32slice []uint32
@ -883,6 +942,11 @@ func (w *Writer) Close() error {
return err
}
}
if w.fP != nil {
if err := w.fP.close(); err != nil {
return err
}
}
if w.fPO != nil {
if err := w.fPO.close(); err != nil {
return err