diff --git a/db.go b/db.go index 42f01b84af..a1ac862698 100644 --- a/db.go +++ b/db.go @@ -333,6 +333,8 @@ func (s *Shard) persist() error { s.mtx.Unlock() + // Only allow another persistence to be triggered after the current one + // has completed (successful or not.) defer func() { <-s.persistCh }() @@ -345,11 +347,11 @@ func (s *Shard) persist() error { return err } - sf, err := os.Create(filepath.Join(p, "series")) + sf, err := os.Create(chunksFileName(p)) if err != nil { return err } - xf, err := os.Create(filepath.Join(p, "index")) + xf, err := os.Create(indexFileName(p)) if err != nil { return err } @@ -360,36 +362,24 @@ func (s *Shard) persist() error { defer sw.Close() defer iw.Close() - for ref, cd := range head.index.forward { - if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil { - return err - } - } - - if err := iw.WriteStats(s.head.stats); err != nil { + if err := head.persist(sw, iw); err != nil { return err } - for n, v := range head.index.values { - s := make([]string, 0, len(v)) - for x := range v { - s = append(s, x) - } - - if err := iw.WriteLabelIndex([]string{n}, s); err != nil { - return err - } - } - - for t := range head.index.postings.m { - if err := iw.WritePostings(t.name, t.value, head.index.postings.get(t)); err != nil { - return err - } - } sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024) s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head") + // Reopen block as persisted block for querying. + pb, err := newPersistedBlock(p) + if err != nil { + return err + } + + s.mtx.Lock() + s.persisted = append(s.persisted, pb) + s.mtx.Unlock() + return nil } diff --git a/head.go b/head.go index a358a8e1ce..6f6ae0593e 100644 --- a/head.go +++ b/head.go @@ -134,3 +134,33 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error return nil } + +func (h *HeadBlock) persist(sw SeriesWriter, iw IndexWriter) error { + for ref, cd := range h.index.forward { + if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil { + return err + } + } + + if err := iw.WriteStats(h.stats); err != nil { + return err + } + for n, v := range h.index.values { + s := make([]string, 0, len(v)) + for x := range v { + s = append(s, x) + } + + if err := iw.WriteLabelIndex([]string{n}, s); err != nil { + return err + } + } + + for t := range h.index.postings.m { + if err := iw.WritePostings(t.name, t.value, h.index.postings.get(t)); err != nil { + return err + } + } + + return nil +}