Factor out index file writing code.

Now that we have more than one file open at a time, deduplicate a bit.

Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
Brian Brazil 2019-12-17 21:54:13 +00:00
parent 85964ce567
commit 1733724e30

View file

@ -112,14 +112,12 @@ func newCRC32() hash.Hash32 {
// serialization format.
type Writer struct {
ctx context.Context
f *os.File
fbuf *bufio.Writer
pos uint64
// For the main index file.
f *fileWriter
// Temporary file for posting offsets table.
fPO *os.File
fbufPO *bufio.Writer
posPO uint64
fPO *fileWriter
cntPO uint64
toc TOC
@ -200,12 +198,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
}
// Main index file we are building.
f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
f, err := newFileWriter(fn)
if err != nil {
return nil, err
}
// Temporary file for posting offset table.
fPO, err := os.OpenFile(fn+"_tmp_po", os.O_CREATE|os.O_RDWR, 0666)
fPO, err := newFileWriter(fn + "_tmp_po")
if err != nil {
return nil, err
}
@ -216,11 +214,7 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
iw := &Writer{
ctx: ctx,
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
fPO: fPO,
fbufPO: bufio.NewWriterSize(fPO, 1<<22),
posPO: 0,
stage: idxStageNone,
// Reusable memory.
@ -238,9 +232,41 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
}
func (w *Writer) write(bufs ...[]byte) error {
return w.f.write(bufs...)
}
func (w *Writer) writeAt(buf []byte, pos uint64) error {
return w.f.writeAt(buf, pos)
}
func (w *Writer) addPadding(size int) error {
return w.f.addPadding(size)
}
type fileWriter struct {
f *os.File
fbuf *bufio.Writer
pos uint64
name string
}
func newFileWriter(name string) (*fileWriter, error) {
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, err
}
return &fileWriter{
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
name: name,
}, nil
}
func (fw *fileWriter) write(bufs ...[]byte) error {
for _, b := range bufs {
n, err := w.fbuf.Write(b)
w.pos += uint64(n)
n, err := fw.fbuf.Write(b)
fw.pos += uint64(n)
if err != nil {
return err
}
@ -248,29 +274,47 @@ func (w *Writer) write(bufs ...[]byte) error {
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if w.pos > 16*math.MaxUint32 {
if fw.pos > 16*math.MaxUint32 {
return errors.Errorf("exceeding max size of 64GiB")
}
}
return nil
}
func (w *Writer) writeAt(buf []byte, pos uint64) error {
if err := w.fbuf.Flush(); err != nil {
func (fw *fileWriter) flush() error {
return fw.fbuf.Flush()
}
func (fw *fileWriter) writeAt(buf []byte, pos uint64) error {
if err := fw.flush(); err != nil {
return err
}
_, err := w.f.WriteAt(buf, int64(pos))
_, err := fw.f.WriteAt(buf, int64(pos))
return err
}
// addPadding adds zero byte padding until the file size is a multiple size.
func (w *Writer) addPadding(size int) error {
p := w.pos % uint64(size)
func (fw *fileWriter) addPadding(size int) error {
p := fw.pos % uint64(size)
if p == 0 {
return nil
}
p = uint64(size) - p
return errors.Wrap(w.write(make([]byte, p)), "add padding")
return errors.Wrap(fw.write(make([]byte, p)), "add padding")
}
func (fw *fileWriter) close() error {
if err := fw.flush(); err != nil {
return err
}
if err := fw.f.Sync(); err != nil {
return err
}
return fw.f.Close()
}
func (fw *fileWriter) remove() error {
return os.Remove(fw.name)
}
// ensureStage handles transitions between write stages and ensures that IndexWriter
@ -292,7 +336,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
// Mark start of sections in table of contents.
switch s {
case idxStageSymbols:
w.toc.Symbols = w.pos
w.toc.Symbols = w.f.pos
if err := w.startSymbols(); err != nil {
return err
}
@ -300,22 +344,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
if err := w.finishSymbols(); err != nil {
return err
}
w.toc.Series = w.pos
w.toc.Series = w.f.pos
case idxStageLabelIndex:
w.toc.LabelIndices = w.pos
w.toc.LabelIndices = w.f.pos
case idxStageDone:
w.toc.Postings = w.pos
w.toc.Postings = w.f.pos
if err := w.writePostings(); err != nil {
return err
}
w.toc.LabelIndicesTable = w.pos
w.toc.LabelIndicesTable = w.f.pos
if err := w.writeLabelIndexesOffsetTable(); err != nil {
return err
}
w.toc.PostingsTable = w.pos
w.toc.PostingsTable = w.f.pos
if err := w.writePostingsOffsetTable(); err != nil {
return err
}
@ -354,8 +398,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
return errors.Errorf("failed to write padding bytes: %v", err)
}
if w.pos%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
if w.f.pos%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos)
}
w.buf2.Reset()
@ -436,24 +480,24 @@ func (w *Writer) AddSymbol(sym string) error {
func (w *Writer) finishSymbols() error {
// Write out the length and symbol count.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4))
w.buf1.PutBE32int(int(w.f.pos - w.toc.Symbols - 4))
w.buf1.PutBE32int(int(w.numSymbols))
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
return err
}
hashPos := w.pos
hashPos := w.f.pos
// Leave space for the hash. We can only calculate it
// now that the number of symbols is known, so mmap and do it from there.
if err := w.write([]byte("hash")); err != nil {
return err
}
if err := w.fbuf.Flush(); err != nil {
if err := w.f.flush(); err != nil {
return err
}
var err error
w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name())
w.symbolFile, err = fileutil.OpenMmapFile(w.f.name)
if err != nil {
return err
}
@ -493,10 +537,10 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
keys: names,
offset: w.pos,
offset: w.f.pos,
})
startPos := w.pos
startPos := w.f.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
@ -527,7 +571,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
@ -539,7 +583,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
// writeLabelIndexesOffsetTable writes the label indices offset table.
func (w *Writer) writeLabelIndexesOffsetTable() error {
startPos := w.pos
startPos := w.f.pos
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
@ -567,7 +611,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
}
// Write out the length.
w.buf1.Reset()
w.buf1.PutBE32int(int(w.pos - startPos - 4))
w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
return err
}
@ -580,12 +624,12 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
// writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error {
// Ensure everything is in the temporary file.
if err := w.fbufPO.Flush(); err != nil {
if err := w.fPO.flush(); err != nil {
return err
}
w.buf1.Reset()
w.buf1.PutBE32int(int(w.posPO) + 4) // Length, including the count.
w.buf1.PutBE32int(int(w.fPO.pos) + 4) // Length, including the count.
if err := w.write(w.buf1.Get()); err != nil {
return err
}
@ -598,14 +642,14 @@ func (w *Writer) writePostingsOffsetTable() error {
return err
}
// Copy temporary file into main index.
if _, err := w.fPO.Seek(0, 0); err != nil {
if _, err := w.fPO.f.Seek(0, 0); err != nil {
return err
}
buf := make([]byte, 1<<20)
l := 0
for {
n, err := w.fPO.Read(buf)
n, err := w.fPO.f.Read(buf)
if err != nil && err != io.EOF {
return err
}
@ -618,19 +662,18 @@ func (w *Writer) writePostingsOffsetTable() error {
return err
}
}
if w.posPO != uint64(l) {
return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.posPO, l)
if w.fPO.pos != uint64(l) {
return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.fPO.pos, l)
}
// Cleanup temporary file.
name := w.fPO.Name()
if err := w.fPO.Close(); err != nil {
if err := w.fPO.close(); err != nil {
return err
}
if err := w.fPO.remove(); err != nil {
return err
}
w.fPO = nil
if err := os.Remove(name); err != nil {
return err
}
// Finally write the hash.
w.buf1.Reset()
@ -662,10 +705,10 @@ func (w *Writer) writePostings() error {
}
sort.Strings(names)
if err := w.fbuf.Flush(); err != nil {
if err := w.f.flush(); err != nil {
return err
}
f, err := fileutil.OpenMmapFile(w.f.Name())
f, err := fileutil.OpenMmapFile(w.f.name)
if err != nil {
return err
}
@ -790,11 +833,9 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf1.PutUvarint(2)
w.buf1.PutUvarintStr(name)
w.buf1.PutUvarintStr(value)
w.buf1.PutUvarint64(w.pos)
if n, err := w.fbufPO.Write(w.buf1.Get()); err != nil {
w.buf1.PutUvarint64(w.f.pos)
if err := w.fPO.write(w.buf1.Get()); err != nil {
return err
} else {
w.posPO += uint64(n)
}
w.cntPO++
@ -840,17 +881,11 @@ func (w *Writer) Close() error {
}
}
if w.fPO != nil {
if err := w.fPO.Close(); err != nil {
if err := w.fPO.close(); err != nil {
return err
}
}
if err := w.fbuf.Flush(); err != nil {
return err
}
if err := w.f.Sync(); err != nil {
return err
}
return w.f.Close()
return w.f.close()
}
// StringTuples provides access to a sorted list of string tuples.