diff --git a/block.go b/block.go index 53f8b6c4f..bc9f581ab 100644 --- a/block.go +++ b/block.go @@ -22,6 +22,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -112,7 +113,7 @@ type BlockStats struct { type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has // gone through. - Generation int `json:"generation"` + Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` } @@ -181,13 +182,13 @@ type persistedBlock struct { tombstones tombstoneReader } -func newPersistedBlock(dir string) (*persistedBlock, error) { +func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err } - cr, err := newChunkReader(chunkDir(dir)) + cr, err := newChunkReader(chunkDir(dir), pool) if err != nil { return nil, err } @@ -252,7 +253,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { stones := map[uint32]intervals{} var lset labels.Labels - var chks []*ChunkMeta + var chks []ChunkMeta Outer: for p.Next() { diff --git a/chunks.go b/chunks.go index 075384cd5..6bed69700 100644 --- a/chunks.go +++ b/chunks.go @@ -100,7 +100,7 @@ type ChunkWriter interface { // must be populated. // After returning successfully, the Ref fields in the ChunkMetas // are set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...*ChunkMeta) error + WriteChunks(chunks ...ChunkMeta) error // Close writes any required finalization and closes the resources // associated with the underlying writer. @@ -222,7 +222,7 @@ func (w *chunkWriter) write(b []byte) error { return err } -func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { +func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. maxLen := int64(binary.MaxVarintLen32) // The number of chunks. @@ -238,23 +238,22 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { } } - b := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(b, uint64(len(chks))) + var ( + b = [binary.MaxVarintLen32]byte{} + seq = uint64(w.seq()) << 32 + ) + for i := range chks { + chk := &chks[i] - if err := w.write(b[:n]); err != nil { - return err - } - seq := uint64(w.seq()) << 32 - - for _, chk := range chks { chk.Ref = seq | uint64(w.n) - n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) + n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) if err := w.write(b[:n]); err != nil { return err } - if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { + b[0] = byte(chk.Chunk.Encoding()) + if err := w.write(b[:1]); err != nil { return err } if err := w.write(chk.Chunk.Bytes()); err != nil { @@ -265,7 +264,7 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { if err := chk.writeHash(w.crc32); err != nil { return err } - if err := w.write(w.crc32.Sum(nil)); err != nil { + if err := w.write(w.crc32.Sum(b[:0])); err != nil { return err } } @@ -298,15 +297,20 @@ type chunkReader struct { // Closers for resources behind the byte slices. cs []io.Closer + + pool chunks.Pool } // newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string) (*chunkReader, error) { +func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { files, err := sequenceFiles(dir, "") if err != nil { return nil, err } - var cr chunkReader + if pool == nil { + pool = chunks.NewPool() + } + cr := chunkReader{pool: pool} for _, fn := range files { f, err := openMmapFile(fn) @@ -353,11 +357,6 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { return nil, fmt.Errorf("reading chunk length failed") } b = b[n:] - enc := chunks.Encoding(b[0]) - c, err := chunks.FromData(enc, b[1:1+l]) - if err != nil { - return nil, err - } - return c, nil + return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) } diff --git a/chunks/chunk.go b/chunks/chunk.go index 6bed4455f..181693fae 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -13,7 +13,12 @@ package chunks -import "fmt" +import ( + "fmt" + "sync" + + "github.com/pkg/errors" +) // Encoding is the identifier for a chunk encoding. type Encoding uint8 @@ -63,3 +68,53 @@ type Iterator interface { Err() error Next() bool } + +type Pool interface { + Put(Chunk) error + Get(e Encoding, b []byte) (Chunk, error) +} + +// Pool is a memory pool of chunk objects. +type pool struct { + xor sync.Pool +} + +func NewPool() Pool { + return &pool{ + xor: sync.Pool{ + New: func() interface{} { + return &XORChunk{b: &bstream{}} + }, + }, + } +} + +func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { + switch e { + case EncXOR: + c := p.xor.Get().(*XORChunk) + c.b.stream = b + c.b.count = 0 + return c, nil + } + return nil, errors.Errorf("invalid encoding %q", e) +} + +func (p *pool) Put(c Chunk) error { + switch c.Encoding() { + case EncXOR: + xc, ok := c.(*XORChunk) + // This may happen often with wrapped chunks. Nothing we can really do about + // it but returning an error would cause a lot of allocations again. Thus, + // we just skip it. + if !ok { + return nil + } + xc.b.stream = nil + xc.b.count = 0 + p.xor.Put(c) + default: + return errors.Errorf("invalid encoding %q", c.Encoding()) + } + return nil +} diff --git a/compact.go b/compact.go index 0027cd50c..3860509d8 100644 --- a/compact.go +++ b/compact.go @@ -48,22 +48,22 @@ type Compactor interface { // Plan returns a set of non-overlapping directories that can // be compacted concurrently. // Results returned when compactions are in progress are undefined. - Plan() ([][]string, error) + Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(b Block) error + Write(dest string, b Block) error // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). - Compact(dirs ...string) error + Compact(dest string, dirs ...string) error } -// compactor implements the Compactor interface. -type compactor struct { +// LeveledCompactor implements the Compactor interface. +type LeveledCompactor struct { dir string metrics *compactorMetrics logger log.Logger - opts *compactorOptions + opts *LeveledCompactorOptions } type compactorMetrics struct { @@ -98,13 +98,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { return m } -type compactorOptions struct { +type LeveledCompactorOptions struct { blockRanges []int64 + chunkPool chunks.Pool } -func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { - return &compactor{ - dir: dir, +func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor { + if opts == nil { + opts = &LeveledCompactorOptions{ + chunkPool: chunks.NewPool(), + } + } + return &LeveledCompactor{ opts: opts, logger: l, metrics: newCompactorMetrics(r), @@ -124,8 +129,9 @@ type dirMeta struct { meta *BlockMeta } -func (c *compactor) Plan() ([][]string, error) { - dirs, err := blockDirs(c.dir) +// Plan returns a list of compactable blocks in the provided directory. +func (c *LeveledCompactor) Plan(dir string) ([]string, error) { + dirs, err := blockDirs(dir) if err != nil { return nil, err } @@ -137,7 +143,7 @@ func (c *compactor) Plan() ([][]string, error) { if err != nil { return nil, err } - if meta.Compaction.Generation > 0 { + if meta.Compaction.Level > 0 { dms = append(dms, dirMeta{dir, meta}) } } @@ -149,20 +155,12 @@ func (c *compactor) Plan() ([][]string, error) { return nil, nil } - sliceDirs := func(dms []dirMeta) [][]string { - if len(dms) == 0 { - return nil - } - var res []string - for _, dm := range dms { - res = append(res, dm.dir) - } - return [][]string{res} + var res []string + for _, dm := range c.selectDirs(dms) { + res = append(res, dm.dir) } - - planDirs := sliceDirs(c.selectDirs(dms)) - if len(dirs) > 1 { - return planDirs, nil + if len(res) > 0 { + return res, nil } // Compact any blocks that have >5% tombstones. @@ -173,7 +171,7 @@ func (c *compactor) Plan() ([][]string, error) { } if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5% - return [][]string{{dms[i].dir}}, nil + return []string{dms[i].dir}, nil } } @@ -182,7 +180,7 @@ func (c *compactor) Plan() ([][]string, error) { // selectDirs returns the dir metas that should be compacted into a single new block. // If only a single block range is configured, the result is always nil. -func (c *compactor) selectDirs(ds []dirMeta) []dirMeta { +func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { if len(c.opts.blockRanges) < 2 || len(ds) < 1 { return nil } @@ -261,18 +259,18 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { sources := map[ulid.ULID]struct{}{} for _, b := range blocks { - if b.Compaction.Generation > res.Compaction.Generation { - res.Compaction.Generation = b.Compaction.Generation + if b.Compaction.Level > res.Compaction.Level { + res.Compaction.Level = b.Compaction.Level } for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } // If it's an in memory block, its ULID goes into the sources. - if b.Compaction.Generation == 0 { + if b.Compaction.Level == 0 { sources[b.ULID] = struct{}{} } } - res.Compaction.Generation++ + res.Compaction.Level++ for s := range sources { res.Compaction.Sources = append(res.Compaction.Sources, s) @@ -284,11 +282,13 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { return res } -func (c *compactor) Compact(dirs ...string) (err error) { +// Compact creates a new block in the compactor's directory from the blocks in the +// provided directories. +func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var blocks []Block for _, d := range dirs { - b, err := newPersistedBlock(d) + b, err := newPersistedBlock(d, c.opts.chunkPool) if err != nil { return err } @@ -300,24 +300,24 @@ func (c *compactor) Compact(dirs ...string) (err error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(uid, blocks...) + return c.write(dest, uid, blocks...) } -func (c *compactor) Write(b Block) error { +func (c *LeveledCompactor) Write(dest string, b Block) error { // Buffering blocks might have been created that often have no data. if b.Meta().Stats.NumSeries == 0 { - return errors.Wrap(os.RemoveAll(b.Dir()), "remove empty block") + return nil } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(uid, b) + return c.write(dest, uid, b) } // write creates a new block that is the union of the provided blocks into dir. // It cleans up all files of the old blocks after completing successfully. -func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { +func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (err error) { c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks)) defer func(t time.Time) { @@ -328,7 +328,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(c.dir, uid.String()) + dir := filepath.Join(dest, uid.String()) tmp := dir + ".tmp" if err = os.RemoveAll(tmp); err != nil { @@ -350,7 +350,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { return errors.Wrap(err, "open index writer") } - meta, err := populateBlock(blocks, indexw, chunkw) + meta, err := c.populateBlock(blocks, indexw, chunkw) if err != nil { return errors.Wrap(err, "write compaction") } @@ -376,11 +376,6 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") } - for _, b := range blocks { - if err := os.RemoveAll(b.Dir()); err != nil { - return err - } - } // Properly sync parent dir to ensure changes are visible. df, err := fileutil.OpenDir(dir) if err != nil { @@ -397,7 +392,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { +func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var ( set compactionSet metas []BlockMeta @@ -474,7 +469,6 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo } } } - if err := chunkw.WriteChunks(chks...); err != nil { return nil, err } @@ -489,6 +483,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } + for _, chk := range chks { + c.opts.chunkPool.Put(chk.Chunk) + } + for _, l := range lset { valset, ok := values[l.Name] if !ok { @@ -497,7 +495,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo } valset.set(l.Value) - postings.add(i, term{name: l.Name, value: l.Value}) + t := term{name: l.Name, value: l.Value} + + postings.add(i, t) } i++ } @@ -536,7 +536,7 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo type compactionSet interface { Next() bool - At() (labels.Labels, []*ChunkMeta, intervals) + At() (labels.Labels, []ChunkMeta, intervals) Err() error } @@ -548,7 +548,7 @@ type compactionSeriesSet struct { series SeriesSet l labels.Labels - c []*ChunkMeta + c []ChunkMeta intervals intervals err error } @@ -574,7 +574,7 @@ func (c *compactionSeriesSet) Next() bool { // Remove completely deleted chunks. if len(c.intervals) > 0 { - chks := make([]*ChunkMeta, 0, len(c.c)) + chks := make([]ChunkMeta, 0, len(c.c)) for _, chk := range c.c { if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { chks = append(chks, chk) @@ -584,7 +584,9 @@ func (c *compactionSeriesSet) Next() bool { c.c = chks } - for _, chk := range c.c { + for i := range c.c { + chk := &c.c[i] + chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) if c.err != nil { return false @@ -601,7 +603,7 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) { +func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) { return c.l, c.c, c.intervals } @@ -610,7 +612,7 @@ type compactionMerger struct { aok, bok bool l labels.Labels - c []*ChunkMeta + c []ChunkMeta intervals intervals } @@ -651,7 +653,7 @@ func (c *compactionMerger) Next() bool { // While advancing child iterators the memory used for labels and chunks // may be reused. When picking a series we have to store the result. var lset labels.Labels - var chks []*ChunkMeta + var chks []ChunkMeta d := c.compare() // Both sets contain the current series. Chain them into a single one. @@ -681,6 +683,7 @@ func (c *compactionMerger) Next() bool { c.aok = c.a.Next() c.bok = c.b.Next() } + return true } @@ -691,7 +694,7 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []*ChunkMeta, intervals) { +func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, intervals) { return c.l, c.c, c.intervals } diff --git a/compact_test.go b/compact_test.go index 91a2812ca..d9791a080 100644 --- a/compact_test.go +++ b/compact_test.go @@ -19,8 +19,8 @@ import ( "github.com/stretchr/testify/require" ) -func TestCompactionSelect(t *testing.T) { - opts := &compactorOptions{ +func TestLeveledCompactor_Select(t *testing.T) { + opts := &LeveledCompactorOptions{ blockRanges: []int64{ 20, 60, @@ -173,7 +173,7 @@ func TestCompactionSelect(t *testing.T) { }, } - c := &compactor{ + c := &LeveledCompactor{ opts: opts, } sliceDirs := func(dms []dirMeta) [][]string { diff --git a/db.go b/db.go index 928e8e9e9..8d581cdfa 100644 --- a/db.go +++ b/db.go @@ -37,6 +37,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" ) @@ -95,9 +96,10 @@ type DB struct { dir string lockf *lockfile.Lockfile - logger log.Logger - metrics *dbMetrics - opts *Options + logger log.Logger + metrics *dbMetrics + opts *Options + chunkPool chunks.Pool // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex @@ -203,6 +205,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), compactionsEnabled: true, + chunkPool: chunks.NewPool(), } db.metrics = newDBMetrics(db, r) @@ -221,8 +224,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - copts := &compactorOptions{ + copts := &LeveledCompactorOptions{ blockRanges: opts.BlockRanges, + chunkPool: db.chunkPool, } if len(copts.blockRanges) == 0 { @@ -238,7 +242,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] } - db.compactor = newCompactor(dir, r, l, copts) + db.compactor = NewLeveledCompactor(r, l, copts) if err := db.reloadBlocks(); err != nil { return nil, err @@ -386,20 +390,24 @@ func (db *DB) compact() (changes bool, err error) { default: } - if err = db.compactor.Write(h); err != nil { + if err = db.compactor.Write(db.dir, h); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true + + if err := os.RemoveAll(h.Dir()); err != nil { + return changes, errors.Wrap(err, "delete compacted head block") + } runtime.GC() } // Check for compactions of multiple blocks. for { - plans, err := db.compactor.Plan() + plan, err := db.compactor.Plan(db.dir) if err != nil { return changes, errors.Wrap(err, "plan compaction") } - if len(plans) == 0 { + if len(plan) == 0 { break } @@ -409,17 +417,17 @@ func (db *DB) compact() (changes bool, err error) { default: } - // We just execute compactions sequentially to not cause too extreme - // CPU and memory spikes. - // TODO(fabxc): return more descriptive plans in the future that allow - // estimation of resource usage and conditional parallelization? - for _, p := range plans { - if err := db.compactor.Compact(p...); err != nil { - return changes, errors.Wrapf(err, "compact %s", p) - } - changes = true - runtime.GC() + if err := db.compactor.Compact(db.dir, plan...); err != nil { + return changes, errors.Wrapf(err, "compact %s", plan) } + changes = true + + for _, pd := range plan { + if err := os.RemoveAll(pd); err != nil { + return changes, errors.Wrap(err, "delete compacted block") + } + } + runtime.GC() } return changes, nil @@ -505,10 +513,10 @@ func (db *DB) reloadBlocks() (err error) { b, ok := db.getBlock(meta.ULID) if !ok { - if meta.Compaction.Generation == 0 { + if meta.Compaction.Level == 0 { b, err = db.openHeadBlock(dir) } else { - b, err = newPersistedBlock(dir) + b, err = newPersistedBlock(dir, db.chunkPool) } if err != nil { return errors.Wrapf(err, "open block %s", dir) @@ -534,7 +542,7 @@ func (db *DB) reloadBlocks() (err error) { db.heads = nil for _, b := range blocks { - if b.Meta().Compaction.Generation == 0 { + if b.Meta().Compaction.Level == 0 { db.heads = append(db.heads, b.(*HeadBlock)) } } @@ -603,6 +611,9 @@ func (db *DB) EnableCompactions() { // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { + if dir == db.dir { + return errors.Errorf("cannot snapshot into base directory") + } db.cmtx.Lock() defer db.cmtx.Unlock() @@ -869,7 +880,7 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { return nil, errors.Wrap(err, "open WAL %s") } - h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal) + h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal, db.compactor) if err != nil { return nil, errors.Wrapf(err, "open head block %s", dir) } diff --git a/head.go b/head.go index 9e99d3777..045378d9c 100644 --- a/head.go +++ b/head.go @@ -52,9 +52,10 @@ var ( // HeadBlock handles reads and writes of time series data within a time window. type HeadBlock struct { - mtx sync.RWMutex - dir string - wal WAL + mtx sync.RWMutex + dir string + wal WAL + compactor Compactor activeWriters uint64 highTimestamp int64 @@ -106,7 +107,7 @@ func TouchHeadBlock(dir string, mint, maxt int64) (string, error) { } // OpenHeadBlock opens the head block in dir. -func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { +func OpenHeadBlock(dir string, l log.Logger, wal WAL, c Compactor) (*HeadBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -115,6 +116,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { h := &HeadBlock{ dir: dir, wal: wal, + compactor: c, series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, @@ -266,68 +268,14 @@ Outer: } // Snapshot persists the current state of the headblock to the given directory. -// TODO(gouthamve): Snapshot must be called when there are no active appenders. -// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should -// be removed in the future. +// Callers must ensure that there are no active appenders against the block. +// DB does this by acquiring its own write lock. func (h *HeadBlock) Snapshot(snapshotDir string) error { if h.meta.Stats.NumSeries == 0 { return nil } - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) - - dir := filepath.Join(snapshotDir, uid.String()) - tmp := dir + ".tmp" - - if err := os.RemoveAll(tmp); err != nil { - return err - } - - if err := os.MkdirAll(tmp, 0777); err != nil { - return err - } - - // Populate chunk and index files into temporary directory with - // data of all blocks. - chunkw, err := newChunkWriter(chunkDir(tmp)) - if err != nil { - return errors.Wrap(err, "open chunk writer") - } - indexw, err := newIndexWriter(tmp) - if err != nil { - return errors.Wrap(err, "open index writer") - } - - meta, err := populateBlock([]Block{h}, indexw, chunkw) - if err != nil { - return errors.Wrap(err, "write snapshot") - } - meta.ULID = uid - meta.MaxTime = h.highTimestamp - - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - - if err = chunkw.Close(); err != nil { - return errors.Wrap(err, "close chunk writer") - } - if err = indexw.Close(); err != nil { - return errors.Wrap(err, "close index writer") - } - - // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { - return errors.Wrap(err, "write new tombstones file") - } - - // Block successfully written, make visible - if err := renameFile(tmp, dir); err != nil { - return errors.Wrap(err, "rename block dir") - } - - return nil + return h.compactor.Write(snapshotDir, h) } // Dir returns the directory of the block. @@ -702,7 +650,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { +func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { h.mtx.RLock() defer h.mtx.RUnlock() @@ -722,7 +670,7 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*Chunk *chks = (*chks)[:0] for i, c := range s.chunks { - *chks = append(*chks, &ChunkMeta{ + *chks = append(*chks, ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: (uint64(ref) << 32) | uint64(i), diff --git a/head_test.go b/head_test.go index c86c40768..eee730774 100644 --- a/head_test.go +++ b/head_test.go @@ -43,7 +43,7 @@ func openTestHeadBlock(t testing.TB, dir string) *HeadBlock { wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) require.NoError(t, err) - h, err := OpenHeadBlock(dir, nil, wal) + h, err := OpenHeadBlock(dir, nil, wal, nil) require.NoError(t, err) return h } diff --git a/index.go b/index.go index c948ee27c..e3cce3c00 100644 --- a/index.go +++ b/index.go @@ -45,8 +45,8 @@ const compactionPageBytes = minSectorSize * 64 type indexWriterSeries struct { labels labels.Labels - chunks []*ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference + chunks []ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference } type indexWriterSeriesSlice []*indexWriterSeries @@ -100,7 +100,7 @@ type IndexWriter interface { // their labels. // The reference numbers are used to resolve entries in postings lists that // are added later. - AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error + AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -261,7 +261,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.buf1.get()) } -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { +func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err } @@ -471,6 +471,7 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { for _, r := range refs { w.buf2.putBE32(r) } + w.uint32s = refs w.buf1.reset() w.buf1.putBE32int(w.buf2.len()) @@ -524,7 +525,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error + Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) @@ -740,7 +741,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error { +func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { d1 := r.decbufAt(int(ref)) d2 := d1.decbuf(int(d1.uvarint())) @@ -781,7 +782,7 @@ func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta return errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - *chks = append(*chks, &ChunkMeta{ + *chks = append(*chks, ChunkMeta{ Ref: off, MinTime: mint, MaxTime: maxt, diff --git a/index_test.go b/index_test.go index 3616daf30..ab45ffccc 100644 --- a/index_test.go +++ b/index_test.go @@ -29,7 +29,7 @@ import ( type series struct { l labels.Labels - chunks []*ChunkMeta + chunks []ChunkMeta } type mockIndex struct { @@ -52,7 +52,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) { return m.symbols, nil } -func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error { +func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error { if _, ok := m.series[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } @@ -64,9 +64,8 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) s := series{l: l} // Actual chunk data is not stored in the index. for _, c := range chunks { - cc := *c - cc.Chunk = nil - s.chunks = append(s.chunks, &cc) + c.Chunk = nil + s.chunks = append(s.chunks, c) } m.series[ref] = s @@ -126,7 +125,7 @@ func (m mockIndex) SortedPostings(p Postings) Postings { return newListPostings(ep) } -func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error { +func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error { s, ok := m.series[ref] if !ok { return ErrNotFound @@ -215,7 +214,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, err) var l labels.Labels - var c []*ChunkMeta + var c []ChunkMeta for i := 0; p.Next(); i++ { err := ir.Series(p.At(), &l, &c) @@ -252,10 +251,10 @@ func TestPersistence_index_e2e(t *testing.T) { // Generate ChunkMetas for every label set. for i, lset := range lbls { - var metas []*ChunkMeta + var metas []ChunkMeta for j := 0; j <= (i % 20); j++ { - metas = append(metas, &ChunkMeta{ + metas = append(metas, ChunkMeta{ MinTime: int64(j * 10000), MaxTime: int64((j + 1) * 10000), Ref: rand.Uint64(), @@ -333,7 +332,7 @@ func TestPersistence_index_e2e(t *testing.T) { expp, err := mi.Postings(p.name, p.value) var lset, explset labels.Labels - var chks, expchks []*ChunkMeta + var chks, expchks []ChunkMeta for gotp.Next() { require.True(t, expp.Next()) diff --git a/querier.go b/querier.go index a54acdd5a..8c2f6cbee 100644 --- a/querier.go +++ b/querier.go @@ -403,7 +403,7 @@ func (s *mergedSeriesSet) Next() bool { type chunkSeriesSet interface { Next() bool - At() (labels.Labels, []*ChunkMeta, intervals) + At() (labels.Labels, []ChunkMeta, intervals) Err() error } @@ -416,12 +416,12 @@ type baseChunkSeries struct { absent []string // labels that must be unset in results. lset labels.Labels - chks []*ChunkMeta + chks []ChunkMeta intervals intervals err error } -func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { +func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) { return s.lset, s.chks, s.intervals } @@ -430,7 +430,7 @@ func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { var ( lset labels.Labels - chunks []*ChunkMeta + chunks []ChunkMeta ) Outer: for s.p.Next() { @@ -453,7 +453,7 @@ Outer: if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. - chks := make([]*ChunkMeta, 0, len(s.chks)) + chks := make([]ChunkMeta, 0, len(s.chks)) for _, chk := range s.chks { if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { chks = append(chks, chk) @@ -480,12 +480,12 @@ type populatedChunkSeries struct { mint, maxt int64 err error - chks []*ChunkMeta + chks []ChunkMeta lset labels.Labels intervals intervals } -func (s *populatedChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) { +func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) { return s.lset, s.chks, s.intervals } func (s *populatedChunkSeries) Err() error { return s.err } @@ -501,8 +501,10 @@ func (s *populatedChunkSeries) Next() bool { chks = chks[1:] } - // Break out at the first chunk that has no overlap with mint, maxt. - for i, c := range chks { + for i := range chks { + c := &chks[i] + + // Break out at the first chunk that has no overlap with mint, maxt. if c.MinTime > s.maxt { chks = chks[:i] break @@ -564,7 +566,7 @@ func (s *blockSeriesSet) Err() error { return s.err } // time series data. type chunkSeries struct { labels labels.Labels - chunks []*ChunkMeta // in-order chunk refs + chunks []ChunkMeta // in-order chunk refs mint, maxt int64 @@ -667,7 +669,7 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - chunks []*ChunkMeta + chunks []ChunkMeta i int cur chunks.Iterator @@ -677,7 +679,7 @@ type chunkSeriesIterator struct { intervals intervals } -func newChunkSeriesIterator(cs []*ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator { it := cs[0].Chunk.Iterator() if len(dranges) > 0 { it = &deletedIterator{it: it, intervals: dranges} diff --git a/querier_test.go b/querier_test.go index d8e8f7657..7c87f8318 100644 --- a/querier_test.go +++ b/querier_test.go @@ -235,12 +235,12 @@ func createIdxChkReaders(tc []struct { for i, s := range tc { i = i + 1 // 0 is not a valid posting. - metas := make([]*ChunkMeta, 0, len(s.chunks)) + metas := make([]ChunkMeta, 0, len(s.chunks)) for _, chk := range s.chunks { // Collisions can be there, but for tests, its fine. ref := rand.Uint64() - metas = append(metas, &ChunkMeta{ + metas = append(metas, ChunkMeta{ MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t, Ref: ref, @@ -661,7 +661,7 @@ Outer: func TestBaseChunkSeries(t *testing.T) { type refdSeries struct { lset labels.Labels - chunks []*ChunkMeta + chunks []ChunkMeta ref uint32 } @@ -677,7 +677,7 @@ func TestBaseChunkSeries(t *testing.T) { series: []refdSeries{ { lset: labels.New([]labels.Label{{"a", "a"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344}, {Ref: 121}, }, @@ -685,19 +685,19 @@ func TestBaseChunkSeries(t *testing.T) { }, { lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, }, ref: 10, }, { lset: labels.New([]labels.Label{{"b", "c"}}...), - chunks: []*ChunkMeta{{Ref: 8282}}, + chunks: []ChunkMeta{{Ref: 8282}}, ref: 1, }, { lset: labels.New([]labels.Label{{"b", "b"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269}, }, ref: 108, @@ -711,14 +711,14 @@ func TestBaseChunkSeries(t *testing.T) { series: []refdSeries{ { lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), - chunks: []*ChunkMeta{ + chunks: []ChunkMeta{ {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, }, ref: 10, }, { lset: labels.New([]labels.Label{{"b", "c"}}...), - chunks: []*ChunkMeta{{Ref: 8282}}, + chunks: []ChunkMeta{{Ref: 8282}}, ref: 1, }, }, @@ -766,7 +766,7 @@ type itSeries struct { func (s itSeries) Iterator() SeriesIterator { return s.si } func (s itSeries) Labels() labels.Labels { return labels.Labels{} } -func chunkFromSamples(s []sample) *ChunkMeta { +func chunkFromSamples(s []sample) ChunkMeta { mint, maxt := int64(0), int64(0) if len(s) > 0 { @@ -779,11 +779,10 @@ func chunkFromSamples(s []sample) *ChunkMeta { for _, s := range s { ca.Append(s.t, s.v) } - return &ChunkMeta{ + return ChunkMeta{ MinTime: mint, MaxTime: maxt, - - Chunk: c, + Chunk: c, } } @@ -945,7 +944,7 @@ func TestSeriesIterator(t *testing.T) { t.Run("Chunk", func(t *testing.T) { for _, tc := range itcases { - chkMetas := []*ChunkMeta{ + chkMetas := []ChunkMeta{ chunkFromSamples(tc.a), chunkFromSamples(tc.b), chunkFromSamples(tc.c), @@ -1016,7 +1015,7 @@ func TestSeriesIterator(t *testing.T) { seekcases2 := append(seekcases, extra...) for _, tc := range seekcases2 { - chkMetas := []*ChunkMeta{ + chkMetas := []ChunkMeta{ chunkFromSamples(tc.a), chunkFromSamples(tc.b), chunkFromSamples(tc.c), @@ -1103,7 +1102,7 @@ func TestSeriesIterator(t *testing.T) { // Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { - chkMetas := []*ChunkMeta{ + chkMetas := []ChunkMeta{ chunkFromSamples([]sample{}), chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}), chunkFromSamples([]sample{{4, 4}, {5, 5}}), @@ -1120,7 +1119,7 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { // Regression when seeked chunks were still found via binary search and we always // skipped to the end when seeking a value in the current chunk. func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { - metas := []*ChunkMeta{ + metas := []ChunkMeta{ chunkFromSamples([]sample{}), chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}), chunkFromSamples([]sample{}), @@ -1141,7 +1140,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})} - chunkMetas := [][]*ChunkMeta{ + chunkMetas := [][]ChunkMeta{ { {MinTime: 1, MaxTime: 2, Ref: 1}, {MinTime: 3, MaxTime: 4, Ref: 2}, @@ -1173,7 +1172,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { require.False(t, p.Next()) // Test the case where 1 chunk could cause an unpopulated chunk to be returned. - chunkMetas = [][]*ChunkMeta{ + chunkMetas = [][]ChunkMeta{ { {MinTime: 1, MaxTime: 2, Ref: 1}, }, @@ -1193,7 +1192,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { type mockChunkSeriesSet struct { l []labels.Labels - cm [][]*ChunkMeta + cm [][]ChunkMeta i int } @@ -1206,7 +1205,7 @@ func (m *mockChunkSeriesSet) Next() bool { return m.i < len(m.l) } -func (m *mockChunkSeriesSet) At() (labels.Labels, []*ChunkMeta, intervals) { +func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) { return m.l[m.i], m.cm[m.i], nil }