From 8b1f514a2def24cd22741805f97bc7aef30ecdeb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 26 Apr 2017 08:33:54 +0200 Subject: [PATCH] index: validate current write stages --- index.go | 110 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 70 insertions(+), 40 deletions(-) diff --git a/index.go b/index.go index 1e2ceb84e..53b437dbd 100644 --- a/index.go +++ b/index.go @@ -41,6 +41,44 @@ const ( const compactionPageBytes = minSectorSize * 64 +type indexWriterSeries struct { + labels labels.Labels + chunks []*ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference +} + +type indexWriterSeriesSlice []*indexWriterSeries + +func (s indexWriterSeriesSlice) Len() int { return len(s) } +func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s indexWriterSeriesSlice) Less(i, j int) bool { + return labels.Compare(s[i].labels, s[j].labels) < 0 +} + +type indexWriterStage uint8 + +const ( + idxStagePopulate indexWriterStage = iota + idxStageLabelIndex + idxStagePostings + idxStageDone +) + +func (s indexWriterStage) String() string { + switch s { + case idxStagePopulate: + return "populate" + case idxStageLabelIndex: + return "label index" + case idxStagePostings: + return "postings" + case idxStageDone: + return "done" + } + return "" +} + // IndexWriter serializes the index for a block of series data. // The methods must generally be called in the order they are specified in. type IndexWriter interface { @@ -63,19 +101,13 @@ type IndexWriter interface { Close() error } -type indexWriterSeries struct { - labels labels.Labels - chunks []*ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference -} - // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - f *os.File - fbuf *bufio.Writer - pos int - started bool + f *os.File + fbuf *bufio.Writer + pos int + stage indexWriterStage // Reusable memory. buf1 encbuf @@ -105,9 +137,10 @@ func newIndexWriter(dir string) (*indexWriter, error) { } iw := &indexWriter{ - f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), - pos: 0, + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + stage: idxStagePopulate, // Reusable memory. buf1: encbuf{b: make([]byte, 0, 1<<22)}, @@ -192,15 +225,6 @@ func (w *indexWriter) writeSymbols() error { return errors.Wrap(err, "write symbols") } -type indexWriterSeriesSlice []*indexWriterSeries - -func (s indexWriterSeriesSlice) Len() int { return len(s) } -func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s indexWriterSeriesSlice) Less(i, j int) bool { - return labels.Compare(s[i].labels, s[j].labels) < 0 -} - func (w *indexWriter) writeSeries() error { // Series must be stored sorted along their labels. series := make(indexWriterSeriesSlice, 0, len(w.series)) @@ -253,28 +277,31 @@ func (w *indexWriter) writeSeries() error { return nil } -func (w *indexWriter) init() error { - if err := w.writeSymbols(); err != nil { - return err +// ensureStage handles transitions between write stages and ensures that IndexWriter +// methods are called in an order valid for the implementation. +func (w *indexWriter) ensureStage(s indexWriterStage) error { + if w.stage == s { + return nil } - if err := w.writeSeries(); err != nil { - return err + if w.stage > s { + return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) + } + if w.stage == idxStagePopulate { + if err := w.writeSymbols(); err != nil { + return err + } + if err := w.writeSeries(); err != nil { + return err + } + w.stage = s + return nil } - w.started = true - return nil } -func (w *indexWriter) ensureStarted() error { - if w.started { - return nil - } - return w.init() -} - func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { - if err := w.ensureStarted(); err != nil { - return errors.Wrap(err, "initialize") + if err := w.ensureStage(idxStageLabelIndex); err != nil { + return errors.Wrap(err, "ensure stage") } valt, err := newStringTuples(values, len(names)) @@ -305,8 +332,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { } func (w *indexWriter) WritePostings(name, value string, it Postings) error { - if err := w.ensureStarted(); err != nil { - return errors.Wrap(err, "initialize") + if err := w.ensureStage(idxStagePostings); err != nil { + return errors.Wrap(err, "ensure stage") } key := name + string(sep) + value @@ -384,6 +411,9 @@ func (w *indexWriter) writeHashmap(h []hashEntry) error { } func (w *indexWriter) finalize() error { + if err := w.ensureStage(idxStageDone); err != nil { + return errors.Wrap(err, "ensure stage") + } // Write out hash maps to jump to correct label index and postings sections. lo := uint32(w.pos) if err := w.writeHashmap(w.labelIndexes); err != nil {