diff --git a/compact.go b/compact.go index 412a24788..73018ca20 100644 --- a/compact.go +++ b/compact.go @@ -177,7 +177,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { } indexw := newIndexWriter(indexf) - chunkw := newSeriesWriter(chunkf, indexw) + chunkw := newChunkWriter(chunkf) if err = c.write(dir, blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -204,7 +204,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return nil } -func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error { +func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error { var set compactionSet for i, b := range blocks { @@ -238,10 +238,12 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw for set.Next() { lset, chunks := set.At() - if err := chunkw.WriteSeries(i, lset, chunks); err != nil { + if err := chunkw.WriteChunks(chunks...); err != nil { return err } + indexw.AddSeries(i, lset, chunks...) + meta.Stats.NumChunks += uint64(len(chunks)) meta.Stats.NumSeries++ diff --git a/writer.go b/writer.go index 2229ff02a..6b4e31deb 100644 --- a/writer.go +++ b/writer.go @@ -25,12 +25,13 @@ const ( const compactionPageBytes = minSectorSize * 64 -// SeriesWriter serializes a time block of chunked series data. -type SeriesWriter interface { - // WriteSeries writes the time series data chunks for a single series. - // The reference is used to resolve the correct series in the written index. - // It only has to be valid for the duration of the write. - WriteSeries(ref uint32, l labels.Labels, chunks []ChunkMeta) error +// ChunkWriter serializes a time block of chunked series data. +type ChunkWriter interface { + // WriteChunks writes several chunks. The data field of the ChunkMetas + // must be populated. + // After returning successfully, the Ref fields in the ChunkMetas + // is set and can be used to retrieve the chunks from the written data. + WriteChunks(chunks ...ChunkMeta) error // Size returns the size of the data written so far. Size() int64 @@ -40,35 +41,32 @@ type SeriesWriter interface { Close() error } -// seriesWriter implements the SeriesWriter interface for the standard +// chunkWriter implements the ChunkWriter interface for the standard // serialization format. -type seriesWriter struct { +type chunkWriter struct { ow io.Writer w *bufio.Writer n int64 c int crc32 hash.Hash - - index IndexWriter } -func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter { - return &seriesWriter{ +func newChunkWriter(w io.Writer) *chunkWriter { + return &chunkWriter{ ow: w, w: bufio.NewWriterSize(w, 1*1024*1024), n: 0, crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), - index: index, } } -func (w *seriesWriter) write(wr io.Writer, b []byte) error { +func (w *chunkWriter) write(wr io.Writer, b []byte) error { n, err := wr.Write(b) w.n += int64(n) return err } -func (w *seriesWriter) writeMeta() error { +func (w *chunkWriter) writeMeta() error { b := [8]byte{} binary.BigEndian.PutUint32(b[:4], MagicSeries) @@ -77,7 +75,7 @@ func (w *seriesWriter) writeMeta() error { return w.write(w.w, b[:]) } -func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkMeta) error { +func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { // Initialize with meta data. if w.n == 0 { if err := w.writeMeta(); err != nil { @@ -122,18 +120,14 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { return err } - - if w.index != nil { - w.index.AddSeries(ref, lset, chks...) - } return nil } -func (w *seriesWriter) Size() int64 { +func (w *chunkWriter) Size() int64 { return w.n } -func (w *seriesWriter) Close() error { +func (w *chunkWriter) Close() error { // Initialize block in case no data was written to it. if w.n == 0 { if err := w.writeMeta(); err != nil {