From 863613300de075964d58f52c26c6ea9d1dc15646 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 9 Jan 2020 11:28:10 +0000 Subject: [PATCH] Exposed FileWriter. (#6589) Signed-off-by: Bartlomiej Plotka --- tsdb/index/index.go | 82 +++++++++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 81d4a808b..b4da89ce2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -108,12 +108,12 @@ type Writer struct { ctx context.Context // For the main index file. - f *fileWriter + f *FileWriter // Temporary file for postings. - fP *fileWriter + fP *FileWriter // Temporary file for posting offsets table. - fPO *fileWriter + fPO *FileWriter cntPO uint64 toc TOC @@ -194,17 +194,17 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } // Main index file we are building. - f, err := newFileWriter(fn) + f, err := NewFileWriter(fn) if err != nil { return nil, err } // Temporary file for postings. - fP, err := newFileWriter(fn + "_tmp_p") + fP, err := NewFileWriter(fn + "_tmp_p") if err != nil { return nil, err } // Temporary file for posting offset table. - fPO, err := newFileWriter(fn + "_tmp_po") + fPO, err := NewFileWriter(fn + "_tmp_po") if err != nil { return nil, err } @@ -233,30 +233,30 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } func (w *Writer) write(bufs ...[]byte) error { - return w.f.write(bufs...) + return w.f.Write(bufs...) } func (w *Writer) writeAt(buf []byte, pos uint64) error { - return w.f.writeAt(buf, pos) + return w.f.WriteAt(buf, pos) } func (w *Writer) addPadding(size int) error { - return w.f.addPadding(size) + return w.f.AddPadding(size) } -type fileWriter struct { +type FileWriter struct { f *os.File fbuf *bufio.Writer pos uint64 name string } -func newFileWriter(name string) (*fileWriter, error) { +func NewFileWriter(name string) (*FileWriter, error) { f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) if err != nil { return nil, err } - return &fileWriter{ + return &FileWriter{ f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, @@ -264,7 +264,11 @@ func newFileWriter(name string) (*fileWriter, error) { }, nil } -func (fw *fileWriter) write(bufs ...[]byte) error { +func (fw *FileWriter) Pos() uint64 { + return fw.pos +} + +func (fw *FileWriter) Write(bufs ...[]byte) error { for _, b := range bufs { n, err := fw.fbuf.Write(b) fw.pos += uint64(n) @@ -282,12 +286,12 @@ func (fw *fileWriter) write(bufs ...[]byte) error { return nil } -func (fw *fileWriter) flush() error { +func (fw *FileWriter) Flush() error { return fw.fbuf.Flush() } -func (fw *fileWriter) writeAt(buf []byte, pos uint64) error { - if err := fw.flush(); err != nil { +func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error { + if err := fw.Flush(); err != nil { return err } _, err := fw.f.WriteAt(buf, int64(pos)) @@ -295,17 +299,21 @@ func (fw *fileWriter) writeAt(buf []byte, pos uint64) error { } // addPadding adds zero byte padding until the file size is a multiple size. -func (fw *fileWriter) addPadding(size int) error { +func (fw *FileWriter) AddPadding(size int) error { p := fw.pos % uint64(size) if p == 0 { return nil } p = uint64(size) - p - return errors.Wrap(fw.write(make([]byte, p)), "add padding") + + if err := fw.Write(make([]byte, p)); err != nil { + return errors.Wrap(err, "add padding") + } + return nil } -func (fw *fileWriter) close() error { - if err := fw.flush(); err != nil { +func (fw *FileWriter) Close() error { + if err := fw.Flush(); err != nil { return err } if err := fw.f.Sync(); err != nil { @@ -314,7 +322,7 @@ func (fw *fileWriter) close() error { return fw.f.Close() } -func (fw *fileWriter) remove() error { +func (fw *FileWriter) Remove() error { return os.Remove(fw.name) } @@ -506,7 +514,7 @@ func (w *Writer) finishSymbols() error { if err := w.write([]byte("hash")); err != nil { return err } - if err := w.f.flush(); err != nil { + if err := w.f.Flush(); err != nil { return err } @@ -531,7 +539,7 @@ func (w *Writer) finishSymbols() error { } func (w *Writer) writeLabelIndices() error { - if err := w.fPO.flush(); err != nil { + if err := w.fPO.Flush(); err != nil { return err } @@ -673,7 +681,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { // Ensure everything is in the temporary file. - if err := w.fPO.flush(); err != nil { + if err := w.fPO.Flush(); err != nil { return err } @@ -727,10 +735,10 @@ func (w *Writer) writePostingsOffsetTable() error { return err } f = nil - if err := w.fPO.close(); err != nil { + if err := w.fPO.Close(); err != nil { return err } - if err := w.fPO.remove(); err != nil { + if err := w.fPO.Remove(); err != nil { return err } w.fPO = nil @@ -772,7 +780,7 @@ func (w *Writer) writePostingsToTmpFiles() error { } sort.Strings(names) - if err := w.f.flush(); err != nil { + if err := w.f.Flush(); err != nil { return err } f, err := fileutil.OpenMmapFile(w.f.name) @@ -892,7 +900,7 @@ func (w *Writer) writePostingsToTmpFiles() error { func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. - if err := w.fP.addPadding(4); err != nil { + if err := w.fP.AddPadding(4); err != nil { return err } @@ -902,7 +910,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf1.PutUvarintStr(name) w.buf1.PutUvarintStr(value) w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file. - if err := w.fPO.write(w.buf1.Get()); err != nil { + if err := w.fPO.Write(w.buf1.Get()); err != nil { return err } w.cntPO++ @@ -920,18 +928,18 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf2.Reset() w.buf2.PutBE32int(w.buf1.Len()) w.buf1.PutHash(w.crc32) - return w.fP.write(w.buf2.Get(), w.buf1.Get()) + return w.fP.Write(w.buf2.Get(), w.buf1.Get()) } func (w *Writer) writePostings() error { // There's padding in the tmp file, make sure it actually works. - if err := w.f.addPadding(4); err != nil { + if err := w.f.AddPadding(4); err != nil { return err } w.postingsStart = w.f.pos // Copy temporary file into main index. - if err := w.fP.flush(); err != nil { + if err := w.fP.Flush(); err != nil { return err } if _, err := w.fP.f.Seek(0, 0); err != nil { @@ -947,10 +955,10 @@ func (w *Writer) writePostings() error { } w.f.pos += uint64(n) - if err := w.fP.close(); err != nil { + if err := w.fP.Close(); err != nil { return err } - if err := w.fP.remove(); err != nil { + if err := w.fP.Remove(); err != nil { return err } w.fP = nil @@ -978,16 +986,16 @@ func (w *Writer) Close() error { } } if w.fP != nil { - if err := w.fP.close(); err != nil { + if err := w.fP.Close(); err != nil { return err } } if w.fPO != nil { - if err := w.fPO.close(); err != nil { + if err := w.fPO.Close(); err != nil { return err } } - if err := w.f.close(); err != nil { + if err := w.f.Close(); err != nil { return err } return ensureErr