Exposed FileWriter. (#6589)

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-01-09 11:28:10 +00:00 committed by Brian Brazil
parent 835c27a76b
commit 863613300d

View file

@ -108,12 +108,12 @@ type Writer struct {
ctx context.Context ctx context.Context
// For the main index file. // For the main index file.
f *fileWriter f *FileWriter
// Temporary file for postings. // Temporary file for postings.
fP *fileWriter fP *FileWriter
// Temporary file for posting offsets table. // Temporary file for posting offsets table.
fPO *fileWriter fPO *FileWriter
cntPO uint64 cntPO uint64
toc TOC toc TOC
@ -194,17 +194,17 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
} }
// Main index file we are building. // Main index file we are building.
f, err := newFileWriter(fn) f, err := NewFileWriter(fn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Temporary file for postings. // Temporary file for postings.
fP, err := newFileWriter(fn + "_tmp_p") fP, err := NewFileWriter(fn + "_tmp_p")
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Temporary file for posting offset table. // Temporary file for posting offset table.
fPO, err := newFileWriter(fn + "_tmp_po") fPO, err := NewFileWriter(fn + "_tmp_po")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -233,30 +233,30 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
} }
func (w *Writer) write(bufs ...[]byte) error { func (w *Writer) write(bufs ...[]byte) error {
return w.f.write(bufs...) return w.f.Write(bufs...)
} }
func (w *Writer) writeAt(buf []byte, pos uint64) error { func (w *Writer) writeAt(buf []byte, pos uint64) error {
return w.f.writeAt(buf, pos) return w.f.WriteAt(buf, pos)
} }
func (w *Writer) addPadding(size int) error { func (w *Writer) addPadding(size int) error {
return w.f.addPadding(size) return w.f.AddPadding(size)
} }
type fileWriter struct { type FileWriter struct {
f *os.File f *os.File
fbuf *bufio.Writer fbuf *bufio.Writer
pos uint64 pos uint64
name string name string
} }
func newFileWriter(name string) (*fileWriter, error) { func NewFileWriter(name string) (*FileWriter, error) {
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &fileWriter{ return &FileWriter{
f: f, f: f,
fbuf: bufio.NewWriterSize(f, 1<<22), fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0, pos: 0,
@ -264,7 +264,11 @@ func newFileWriter(name string) (*fileWriter, error) {
}, nil }, nil
} }
func (fw *fileWriter) write(bufs ...[]byte) error { func (fw *FileWriter) Pos() uint64 {
return fw.pos
}
func (fw *FileWriter) Write(bufs ...[]byte) error {
for _, b := range bufs { for _, b := range bufs {
n, err := fw.fbuf.Write(b) n, err := fw.fbuf.Write(b)
fw.pos += uint64(n) fw.pos += uint64(n)
@ -282,12 +286,12 @@ func (fw *fileWriter) write(bufs ...[]byte) error {
return nil return nil
} }
func (fw *fileWriter) flush() error { func (fw *FileWriter) Flush() error {
return fw.fbuf.Flush() return fw.fbuf.Flush()
} }
func (fw *fileWriter) writeAt(buf []byte, pos uint64) error { func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error {
if err := fw.flush(); err != nil { if err := fw.Flush(); err != nil {
return err return err
} }
_, err := fw.f.WriteAt(buf, int64(pos)) _, err := fw.f.WriteAt(buf, int64(pos))
@ -295,17 +299,21 @@ func (fw *fileWriter) writeAt(buf []byte, pos uint64) error {
} }
// addPadding adds zero byte padding until the file size is a multiple size. // addPadding adds zero byte padding until the file size is a multiple size.
func (fw *fileWriter) addPadding(size int) error { func (fw *FileWriter) AddPadding(size int) error {
p := fw.pos % uint64(size) p := fw.pos % uint64(size)
if p == 0 { if p == 0 {
return nil return nil
} }
p = uint64(size) - p p = uint64(size) - p
return errors.Wrap(fw.write(make([]byte, p)), "add padding")
if err := fw.Write(make([]byte, p)); err != nil {
return errors.Wrap(err, "add padding")
}
return nil
} }
func (fw *fileWriter) close() error { func (fw *FileWriter) Close() error {
if err := fw.flush(); err != nil { if err := fw.Flush(); err != nil {
return err return err
} }
if err := fw.f.Sync(); err != nil { if err := fw.f.Sync(); err != nil {
@ -314,7 +322,7 @@ func (fw *fileWriter) close() error {
return fw.f.Close() return fw.f.Close()
} }
func (fw *fileWriter) remove() error { func (fw *FileWriter) Remove() error {
return os.Remove(fw.name) return os.Remove(fw.name)
} }
@ -506,7 +514,7 @@ func (w *Writer) finishSymbols() error {
if err := w.write([]byte("hash")); err != nil { if err := w.write([]byte("hash")); err != nil {
return err return err
} }
if err := w.f.flush(); err != nil { if err := w.f.Flush(); err != nil {
return err return err
} }
@ -531,7 +539,7 @@ func (w *Writer) finishSymbols() error {
} }
func (w *Writer) writeLabelIndices() error { func (w *Writer) writeLabelIndices() error {
if err := w.fPO.flush(); err != nil { if err := w.fPO.Flush(); err != nil {
return err return err
} }
@ -673,7 +681,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
// writePostingsOffsetTable writes the postings offset table. // writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error { func (w *Writer) writePostingsOffsetTable() error {
// Ensure everything is in the temporary file. // Ensure everything is in the temporary file.
if err := w.fPO.flush(); err != nil { if err := w.fPO.Flush(); err != nil {
return err return err
} }
@ -727,10 +735,10 @@ func (w *Writer) writePostingsOffsetTable() error {
return err return err
} }
f = nil f = nil
if err := w.fPO.close(); err != nil { if err := w.fPO.Close(); err != nil {
return err return err
} }
if err := w.fPO.remove(); err != nil { if err := w.fPO.Remove(); err != nil {
return err return err
} }
w.fPO = nil w.fPO = nil
@ -772,7 +780,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
} }
sort.Strings(names) sort.Strings(names)
if err := w.f.flush(); err != nil { if err := w.f.Flush(); err != nil {
return err return err
} }
f, err := fileutil.OpenMmapFile(w.f.name) f, err := fileutil.OpenMmapFile(w.f.name)
@ -892,7 +900,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
func (w *Writer) writePosting(name, value string, offs []uint32) error { func (w *Writer) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans. // Align beginning to 4 bytes for more efficient postings list scans.
if err := w.fP.addPadding(4); err != nil { if err := w.fP.AddPadding(4); err != nil {
return err return err
} }
@ -902,7 +910,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf1.PutUvarintStr(name) w.buf1.PutUvarintStr(name)
w.buf1.PutUvarintStr(value) w.buf1.PutUvarintStr(value)
w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file. w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file.
if err := w.fPO.write(w.buf1.Get()); err != nil { if err := w.fPO.Write(w.buf1.Get()); err != nil {
return err return err
} }
w.cntPO++ w.cntPO++
@ -920,18 +928,18 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf2.Reset() w.buf2.Reset()
w.buf2.PutBE32int(w.buf1.Len()) w.buf2.PutBE32int(w.buf1.Len())
w.buf1.PutHash(w.crc32) w.buf1.PutHash(w.crc32)
return w.fP.write(w.buf2.Get(), w.buf1.Get()) return w.fP.Write(w.buf2.Get(), w.buf1.Get())
} }
func (w *Writer) writePostings() error { func (w *Writer) writePostings() error {
// There's padding in the tmp file, make sure it actually works. // There's padding in the tmp file, make sure it actually works.
if err := w.f.addPadding(4); err != nil { if err := w.f.AddPadding(4); err != nil {
return err return err
} }
w.postingsStart = w.f.pos w.postingsStart = w.f.pos
// Copy temporary file into main index. // Copy temporary file into main index.
if err := w.fP.flush(); err != nil { if err := w.fP.Flush(); err != nil {
return err return err
} }
if _, err := w.fP.f.Seek(0, 0); err != nil { if _, err := w.fP.f.Seek(0, 0); err != nil {
@ -947,10 +955,10 @@ func (w *Writer) writePostings() error {
} }
w.f.pos += uint64(n) w.f.pos += uint64(n)
if err := w.fP.close(); err != nil { if err := w.fP.Close(); err != nil {
return err return err
} }
if err := w.fP.remove(); err != nil { if err := w.fP.Remove(); err != nil {
return err return err
} }
w.fP = nil w.fP = nil
@ -978,16 +986,16 @@ func (w *Writer) Close() error {
} }
} }
if w.fP != nil { if w.fP != nil {
if err := w.fP.close(); err != nil { if err := w.fP.Close(); err != nil {
return err return err
} }
} }
if w.fPO != nil { if w.fPO != nil {
if err := w.fPO.close(); err != nil { if err := w.fPO.Close(); err != nil {
return err return err
} }
} }
if err := w.f.close(); err != nil { if err := w.f.Close(); err != nil {
return err return err
} }
return ensureErr return ensureErr