diff --git a/CHANGELOG.md b/CHANGELOG.md index af44615e7..a44471681 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. diff --git a/block.go b/block.go index e5a66bd9e..837f4a763 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + // Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -257,7 +266,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +309,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + total += r.Size() + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +346,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/block_test.go b/block_test.go index cf8716062..789aebaa7 100644 --- a/block_test.go +++ b/block_test.go @@ -46,14 +46,14 @@ func TestSetCompactionFailed(t *testing.T) { defer os.RemoveAll(tmpdir) blockDir := createBlock(t, tmpdir, 0, 0, 0) - b, err := OpenBlock(blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Ok(t, b.setCompactionFailed()) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) - b, err = OpenBlock(blockDir, nil) + b, err = OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) diff --git a/chunks/chunks.go b/chunks/chunks.go index 569aeddc2..8fb288384 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -285,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -305,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -328,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -346,6 +347,11 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + return s.size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/compact.go b/compact.go index f8e6ff545..49d4e5868 100644 --- a/compact.go +++ b/compact.go @@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } diff --git a/db.go b/db.go index 43cfadd46..630700439 100644 --- a/db.go +++ b/db.go @@ -58,6 +58,13 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -127,11 +134,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -170,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", @@ -197,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -204,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -340,25 +353,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -474,8 +468,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -485,112 +478,187 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") + return err } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue - } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} - continue - } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} + deletable := db.deletableBlocks(loadable) + + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) - } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") } - // Load new blocks into memory. - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) - if !ok { - b, err = OpenBlock(dir, db.chunkPool) - if err != nil { - return errors.Wrapf(err, "open block %s", dir) - } - } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + bb = append(bb, block) + blocksSize += block.Size() + } - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { + if err := validateBlockSequence(loadable); err != nil { return errors.Wrap(err, "invalid block sequence") } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. + // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() oldBlocks := db.blocks - db.blocks = blocks + db.blocks = loadable db.mtx.Unlock() - // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue - } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { - if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { - return errors.Wrapf(err, "delete obsolete block %s", ulid) - } + + if err := db.deleteBlocks(deletable); err != nil { + return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. - if len(blocks) == 0 { + if len(loadable) == 0 { return nil } - maxt := blocks[len(blocks)-1].Meta().MaxTime + + maxt := loadable[len(loadable)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + + corrupted = make(map[ulid.ULID]error) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + + // See if we already have the block in memory or open it otherwise. + block, ok := db.getBlock(meta.ULID) + if !ok { + block, err = OpenBlock(db.logger, dir, db.chunkPool) + if err != nil { + corrupted[meta.ULID] = err + continue + } + } + blocks = append(blocks, block) + } + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime + }) + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block + } + + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } + + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + for i, block := range blocks { + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break + } + } + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break + } + } + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) + } + } + return nil +} + // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. func validateBlockSequence(bs []*Block) error { if len(bs) <= 1 { diff --git a/db_test.go b/db_test.go index 92d487eec..2beef0c52 100644 --- a/db_test.go +++ b/db_test.go @@ -92,6 +92,9 @@ func TestDB_reloadOrder(t *testing.T) { testutil.Ok(t, db.reload()) blocks := db.Blocks() + for _, b := range blocks { + b.meta.Stats.NumBytes = 0 + } testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) @@ -834,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) { totalBlocks := 2 for i := 0; i < totalBlocks; i++ { blockDir := createBlock(t, db.Dir(), 0, 0, 0) - block, err := OpenBlock(blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. tomb := newMemTombstones() @@ -877,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -901,59 +904,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } -func TestDB_Retention(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - - lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} - - app := db.Appender() - _, err := app.Add(lbls, 0, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // create snapshot to make it create a block. - // TODO(gouthamve): Add a method to compact headblock. - snap, err := ioutil.TempDir("", "snap") - testutil.Ok(t, err) - - defer os.RemoveAll(snap) - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) - testutil.Ok(t, err) - - testutil.Equals(t, 1, len(db.blocks)) - - app = db.Appender() - _, err = app.Add(lbls, 100, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // Snapshot again to create another block. - snap, err = ioutil.TempDir("", "snap") - testutil.Ok(t, err) - defer os.RemoveAll(snap) - - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, &Options{ - RetentionDuration: 10, - BlockRanges: []int64{50}, +func TestTimeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1000}, }) - testutil.Ok(t, err) + defer close() defer db.Close() - testutil.Equals(t, 2, len(db.blocks)) + blocks := []*BlockMeta{ + {MinTime: 500, MaxTime: 900}, // Oldest block + {MinTime: 1000, MaxTime: 1500}, + {MinTime: 1500, MaxTime: 2000}, // Newest Block + } - // Reload blocks, which should drop blocks beyond the retention boundary. + for _, m := range blocks { + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + } + + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + + db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) testutil.Ok(t, db.reload()) - testutil.Equals(t, 1, len(db.blocks)) - testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") + testutil.Equals(t, len(expBlocks), len(actBlocks)) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) +} + +func TestSizeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + + blocks := []*BlockMeta{ + {MinTime: 100, MaxTime: 200}, // Oldest block + {MinTime: 200, MaxTime: 300}, + {MinTime: 300, MaxTime: 400}, + {MinTime: 400, MaxTime: 500}, + {MinTime: 500, MaxTime: 600}, // Newest Block + } + + for _, m := range blocks { + createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + } + + // Test that registered size matches the actual disk size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. + actSize := dbDiskSize(db.Dir()) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Decrease the max bytes limit so that a delete is triggered. + // Check total size, total count and check that the oldest block was deleted. + firstBlockSize := db.Blocks()[0].Size() + sizeLimit := actSize - firstBlockSize + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) + actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) + actSize = dbDiskSize(db.Dir()) + + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") + testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) + testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") + +} + +func dbDiskSize(dir string) int64 { + var statSize int64 + filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Include only index,tombstone and chunks. + if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || + info.Name() == indexFilename || + info.Name() == tombstoneFilename { + statSize += info.Size() + } + return nil + }) + return statSize } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { diff --git a/index/index.go b/index/index.go index cce29d9b1..74e08d465 100644 --- a/index/index.go +++ b/index/index.go @@ -914,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) diff --git a/querier_test.go b/querier_test.go index 79dfbff7a..69ca84602 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1227,12 +1227,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkPersistedQueries(b *testing.B) { for _, nSeries := range []int{10, 100} { - for _, nSamples := range []int{1000, 10000, 100000} { + for _, nSamples := range []int64{1000, 10000, 100000} { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block, err := OpenBlock(createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) testutil.Ok(b, err) defer block.Close() diff --git a/tombstones.go b/tombstones.go index a1f30b59c..078140406 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index e12574f11..2a106d705 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) - restr, err := readTombstones(tmpdir) + restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers.