diff --git a/head.go b/head.go index 9319980044..01110f1a29 100644 --- a/head.go +++ b/head.go @@ -269,13 +269,19 @@ func (h *HeadBlock) persist(p string) (int64, error) { } iw := newIndexWriter(xf) - sw := newSeriesWriter(sf, iw, h.stats.MinTime) + sw := newSeriesWriter(sf, iw) defer sw.Close() defer iw.Close() for ref, cd := range h.descs { - if err := sw.WriteSeries(uint32(ref), cd.lset, []*chunkDesc{cd}); err != nil { + if err := sw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{ + { + MinTime: cd.firsTimestamp, + MaxTime: cd.lastTimestamp, + Chunk: cd.chunk, + }, + }); err != nil { return 0, err } } @@ -299,6 +305,14 @@ func (h *HeadBlock) persist(p string) (int64, error) { return 0, err } } + // Write a postings list containing all series. + all := make([]uint32, len(h.descs)) + for i := range all { + all[i] = uint32(i) + } + if err := iw.WritePostings("", "", newListPostings(all)); err != nil { + return 0, err + } // Everything written successfully, we can remove the WAL. if err := h.wal.Close(); err != nil { diff --git a/writer.go b/writer.go index ff2154533e..e3f24f370c 100644 --- a/writer.go +++ b/writer.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/bradfitz/slice" + "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" ) @@ -27,7 +28,7 @@ type SeriesWriter interface { // WriteSeries writes the time series data chunks for a single series. // The reference is used to resolve the correct series in the written index. // It only has to be valid for the duration of the write. - WriteSeries(ref uint32, l labels.Labels, cds []*chunkDesc) error + WriteSeries(ref uint32, l labels.Labels, chunks []ChunkMeta) error // Size returns the size of the data written so far. Size() int64 @@ -44,16 +45,14 @@ type seriesWriter struct { n int64 c int - baseTimestamp int64 - index IndexWriter + index IndexWriter } -func newSeriesWriter(w io.Writer, index IndexWriter, base int64) *seriesWriter { +func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter { return &seriesWriter{ - w: w, - n: 0, - index: index, - baseTimestamp: base, + w: w, + n: 0, + index: index, } } @@ -72,7 +71,7 @@ func (w *seriesWriter) writeMeta() error { return w.write(w.w, b[:]) } -func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunkDesc) error { +func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkMeta) error { // Initialize with meta data. if w.n == 0 { if err := w.writeMeta(); err != nil { @@ -96,25 +95,23 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunk return err } - metas := make([]ChunkMeta, 0, len(chks)) + for i := range chks { + chk := &chks[i] - for _, cd := range chks { - metas = append(metas, ChunkMeta{ - MinTime: cd.firsTimestamp, - MaxTime: cd.lastTimestamp, - Ref: uint32(w.n), - }) - n = binary.PutUvarint(b[:], uint64(len(cd.chunk.Bytes()))) + chk.Ref = uint32(w.n) + + n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) if err := w.write(wr, b[:n]); err != nil { return err } - if err := w.write(wr, []byte{byte(cd.chunk.Encoding())}); err != nil { + if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { return err } - if err := w.write(wr, cd.chunk.Bytes()); err != nil { + if err := w.write(wr, chk.Chunk.Bytes()); err != nil { return err } + chk.Chunk = nil } if err := w.write(w.w, h.Sum(nil)); err != nil { @@ -122,7 +119,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []*chunk } if w.index != nil { - w.index.AddSeries(ref, lset, metas...) + w.index.AddSeries(ref, lset, chks...) } return nil } @@ -144,10 +141,15 @@ func (w *seriesWriter) Close() error { return nil } +// ChunkMeta holds information about a chunk of data. type ChunkMeta struct { - Ref uint32 - MinTime int64 - MaxTime int64 + // Ref and Chunk hold either a reference that can be used to retrieve + // chunk data or the data itself. + // Generally, only one of them is set. + Ref uint32 + Chunk chunks.Chunk + + MinTime, MaxTime int64 // time range the data covers } // IndexWriter serialized the index for a block of series data.