diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 1470c7776..06c7e2563 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,6 +110,8 @@ type Writer struct { // For the main index file. f *fileWriter + // Temporary file for postings. + fP *fileWriter // Temporary file for posting offsets table. fPO *fileWriter cntPO uint64 @@ -196,6 +198,11 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { if err != nil { return nil, err } + // Temporary file for postings. + fP, err := newFileWriter(fn + "_tmp_p") + if err != nil { + return nil, err + } // Temporary file for posting offset table. fPO, err := newFileWriter(fn + "_tmp_po") if err != nil { @@ -208,6 +215,7 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { iw := &Writer{ ctx: ctx, f: f, + fP: fP, fPO: fPO, stage: idxStageNone, @@ -323,6 +331,10 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if w.stage == s { return nil } + if w.stage+1 < s { + // A stage has been skipped. + w.ensureStage(s - 1) + } if w.stage > s { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } @@ -342,6 +354,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error { case idxStageDone: w.toc.LabelIndices = w.f.pos + // LabelIndices generation depends on the posting offset + // table produced at this stage. + if err := w.writePostingsToTmpFiles(); err != nil { + return err + } if err := w.writeLabelIndices(); err != nil { return err } @@ -355,6 +372,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } + w.toc.PostingsTable = w.f.pos if err := w.writePostingsOffsetTable(); err != nil { return err @@ -631,12 +649,16 @@ func (w *Writer) writePostingsOffsetTable() error { return err } - w.buf1.Reset() - w.buf1.PutBE32int(int(w.fPO.pos) + 4) // Length, including the count. - if err := w.write(w.buf1.Get()); err != nil { + startPos := w.f.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { return err } + // Copy over the tmp posting offset table, however we need to + // adjust the offsets. + adjustment := w.toc.Postings + w.buf1.Reset() w.crc32.Reset() w.buf1.PutBE32int(int(w.cntPO)) // Count. @@ -644,29 +666,28 @@ func (w *Writer) writePostingsOffsetTable() error { if err := w.write(w.buf1.Get()); err != nil { return err } - // Copy temporary file into main index. - if _, err := w.fPO.f.Seek(0, 0); err != nil { + + f, err := fileutil.OpenMmapFile(w.fPO.name) + if err != nil { return err } - - buf := make([]byte, 1<<20) - l := 0 - for { - n, err := w.fPO.f.Read(buf) - if err != nil && err != io.EOF { - return err - } - if n == 0 { - break - } - l += n - w.crc32.Write(buf[:n]) - if err := w.write(buf[:n]); err != nil { + defer f.Close() + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) + cnt := w.cntPO + for d.Err() == nil && cnt > 0 { + w.buf1.Reset() + w.buf1.PutUvarint(d.Uvarint()) // Keycount. + w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name. + w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value. + w.buf1.PutUvarint64(d.Uvarint64() + adjustment) // Offset. + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { return err } + cnt-- } - if w.fPO.pos != uint64(l) { - return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.fPO.pos, l) + if d.Err() != nil { + return d.Err() } // Cleanup temporary file. @@ -678,6 +699,13 @@ func (w *Writer) writePostingsOffsetTable() error { } w.fPO = nil + // Write out the length. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err + } + // Finally write the hash. w.buf1.Reset() w.buf1.PutHashSum(w.crc32) @@ -701,7 +729,7 @@ func (w *Writer) writeTOC() error { return w.write(w.buf1.Get()) } -func (w *Writer) writePostings() error { +func (w *Writer) writePostingsToTmpFiles() error { names := make([]string, 0, len(w.labelNames)) for n := range w.labelNames { names = append(names, n) @@ -729,9 +757,10 @@ func (w *Writer) writePostings() error { } offsets = append(offsets, uint32(startPos/16)) // Skip to next series. - d.Skip(d.Uvarint() + crc32.Size) + x := d.Uvarint() + d.Skip(x + crc32.Size) if err := d.Err(); err != nil { - return nil + return err } } if err := w.writePosting("", "", offsets); err != nil { @@ -827,7 +856,7 @@ func (w *Writer) writePostings() error { func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. - if err := w.addPadding(4); err != nil { + if err := w.fP.addPadding(4); err != nil { return err } @@ -836,7 +865,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf1.PutUvarint(2) w.buf1.PutUvarintStr(name) w.buf1.PutUvarintStr(value) - w.buf1.PutUvarint64(w.f.pos) + w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file. if err := w.fPO.write(w.buf1.Get()); err != nil { return err } @@ -855,7 +884,37 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf2.Reset() w.buf2.PutBE32int(w.buf1.Len()) w.buf1.PutHash(w.crc32) - return w.write(w.buf2.Get(), w.buf1.Get()) + return w.fP.write(w.buf2.Get(), w.buf1.Get()) +} + +func (w *Writer) writePostings() error { + // There's padding in the tmp filem make sure it actually works. + if err := w.f.addPadding(4); err != nil { + return err + } + + // Copy temporary file into main index. + if err := w.fP.flush(); err != nil { + return err + } + if _, err := w.fP.f.Seek(0, 0); err != nil { + return err + } + // Don't need to calculate a checksum, so can copy directly. + n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20)) + if err != nil { + return err + } + if uint64(n) != w.fP.pos { + return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n) + } + w.f.pos += uint64(n) + + if err := w.fP.remove(); err != nil { + return err + } + w.fP = nil + return nil } type uint32slice []uint32 @@ -883,6 +942,11 @@ func (w *Writer) Close() error { return err } } + if w.fP != nil { + if err := w.fP.close(); err != nil { + return err + } + } if w.fPO != nil { if err := w.fPO.close(); err != nil { return err