index: validate current write stages

This commit is contained in:
Fabian Reinartz 2017-04-26 08:33:54 +02:00
parent 9b4eafcc4c
commit 8b1f514a2d

110
index.go
View file

@ -41,6 +41,44 @@ const (
const compactionPageBytes = minSectorSize * 64 const compactionPageBytes = minSectorSize * 64
type indexWriterSeries struct {
labels labels.Labels
chunks []*ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
}
type indexWriterSeriesSlice []*indexWriterSeries
func (s indexWriterSeriesSlice) Len() int { return len(s) }
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s indexWriterSeriesSlice) Less(i, j int) bool {
return labels.Compare(s[i].labels, s[j].labels) < 0
}
type indexWriterStage uint8
const (
idxStagePopulate indexWriterStage = iota
idxStageLabelIndex
idxStagePostings
idxStageDone
)
func (s indexWriterStage) String() string {
switch s {
case idxStagePopulate:
return "populate"
case idxStageLabelIndex:
return "label index"
case idxStagePostings:
return "postings"
case idxStageDone:
return "done"
}
return "<unknown>"
}
// IndexWriter serializes the index for a block of series data. // IndexWriter serializes the index for a block of series data.
// The methods must generally be called in the order they are specified in. // The methods must generally be called in the order they are specified in.
type IndexWriter interface { type IndexWriter interface {
@ -63,19 +101,13 @@ type IndexWriter interface {
Close() error Close() error
} }
type indexWriterSeries struct {
labels labels.Labels
chunks []*ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
}
// indexWriter implements the IndexWriter interface for the standard // indexWriter implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type indexWriter struct { type indexWriter struct {
f *os.File f *os.File
fbuf *bufio.Writer fbuf *bufio.Writer
pos int pos int
started bool stage indexWriterStage
// Reusable memory. // Reusable memory.
buf1 encbuf buf1 encbuf
@ -105,9 +137,10 @@ func newIndexWriter(dir string) (*indexWriter, error) {
} }
iw := &indexWriter{ iw := &indexWriter{
f: f, f: f,
fbuf: bufio.NewWriterSize(f, 1<<22), fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0, pos: 0,
stage: idxStagePopulate,
// Reusable memory. // Reusable memory.
buf1: encbuf{b: make([]byte, 0, 1<<22)}, buf1: encbuf{b: make([]byte, 0, 1<<22)},
@ -192,15 +225,6 @@ func (w *indexWriter) writeSymbols() error {
return errors.Wrap(err, "write symbols") return errors.Wrap(err, "write symbols")
} }
type indexWriterSeriesSlice []*indexWriterSeries
func (s indexWriterSeriesSlice) Len() int { return len(s) }
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s indexWriterSeriesSlice) Less(i, j int) bool {
return labels.Compare(s[i].labels, s[j].labels) < 0
}
func (w *indexWriter) writeSeries() error { func (w *indexWriter) writeSeries() error {
// Series must be stored sorted along their labels. // Series must be stored sorted along their labels.
series := make(indexWriterSeriesSlice, 0, len(w.series)) series := make(indexWriterSeriesSlice, 0, len(w.series))
@ -253,28 +277,31 @@ func (w *indexWriter) writeSeries() error {
return nil return nil
} }
func (w *indexWriter) init() error { // ensureStage handles transitions between write stages and ensures that IndexWriter
if err := w.writeSymbols(); err != nil { // methods are called in an order valid for the implementation.
return err func (w *indexWriter) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
} }
if err := w.writeSeries(); err != nil { if w.stage > s {
return err return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
if w.stage == idxStagePopulate {
if err := w.writeSymbols(); err != nil {
return err
}
if err := w.writeSeries(); err != nil {
return err
}
w.stage = s
return nil
} }
w.started = true
return nil return nil
} }
func (w *indexWriter) ensureStarted() error {
if w.started {
return nil
}
return w.init()
}
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if err := w.ensureStarted(); err != nil { if err := w.ensureStage(idxStageLabelIndex); err != nil {
return errors.Wrap(err, "initialize") return errors.Wrap(err, "ensure stage")
} }
valt, err := newStringTuples(values, len(names)) valt, err := newStringTuples(values, len(names))
@ -305,8 +332,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
} }
func (w *indexWriter) WritePostings(name, value string, it Postings) error { func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if err := w.ensureStarted(); err != nil { if err := w.ensureStage(idxStagePostings); err != nil {
return errors.Wrap(err, "initialize") return errors.Wrap(err, "ensure stage")
} }
key := name + string(sep) + value key := name + string(sep) + value
@ -384,6 +411,9 @@ func (w *indexWriter) writeHashmap(h []hashEntry) error {
} }
func (w *indexWriter) finalize() error { func (w *indexWriter) finalize() error {
if err := w.ensureStage(idxStageDone); err != nil {
return errors.Wrap(err, "ensure stage")
}
// Write out hash maps to jump to correct label index and postings sections. // Write out hash maps to jump to correct label index and postings sections.
lo := uint32(w.pos) lo := uint32(w.pos)
if err := w.writeHashmap(w.labelIndexes); err != nil { if err := w.writeHashmap(w.labelIndexes); err != nil {