diff --git a/CHANGELOG.md b/CHANGELOG.md index a5b0d09be..12156d9df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## master / unreleased - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) + - [BUGFIX] Re-calculate block size when calling `block.Delete`. +- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/block.go b/block.go index 1b6e79d9d..bd54e297c 100644 --- a/block.go +++ b/block.go @@ -151,12 +151,6 @@ type Appendable interface { Appender() Appender } -// SizeReader returns the size of the object in bytes. -type SizeReader interface { - // Size returns the size in bytes. - Size() int64 -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -183,7 +177,6 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` - NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -214,24 +207,24 @@ const metaFilename = "meta.json" func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } -func readMetaFile(dir string) (*BlockMeta, error) { +func readMetaFile(dir string) (*BlockMeta, int64, error) { b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) if err != nil { - return nil, err + return nil, 0, err } var m BlockMeta if err := json.Unmarshal(b, &m); err != nil { - return nil, err + return nil, 0, err } if m.Version != 1 { - return nil, errors.Errorf("unexpected meta file version %d", m.Version) + return nil, 0, errors.Errorf("unexpected meta file version %d", m.Version) } - return &m, nil + return &m, int64(len(b)), nil } -func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error { +func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) (int64, error) { meta.Version = 1 // Make any changes to the file appear atomic. @@ -245,26 +238,32 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error { f, err := os.Create(tmp) if err != nil { - return err + return 0, err } - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") + jsonMeta, err := json.MarshalIndent(meta, "", "\t") + if err != nil { + return 0, err + } var merr tsdb_errors.MultiError - if merr.Add(enc.Encode(meta)); merr.Err() != nil { + n, err := f.Write(jsonMeta) + if err != nil { + merr.Add(err) merr.Add(f.Close()) - return merr.Err() + return 0, merr.Err() } + // Force the kernel to persist the file on disk to avoid data loss if the host crashes. - if merr.Add(f.Sync()); merr.Err() != nil { + if err := f.Sync(); err != nil { + merr.Add(err) merr.Add(f.Close()) - return merr.Err() + return 0, merr.Err() } if err := f.Close(); err != nil { - return err + return 0, err } - return fileutil.Replace(tmp, path) + return int64(n), fileutil.Replace(tmp, path) } // Block represents a directory of time series data covering a continuous time range. @@ -285,6 +284,11 @@ type Block struct { tombstones TombstoneReader logger log.Logger + + numBytesChunks int64 + numBytesIndex int64 + numBytesTombstone int64 + numBytesMeta int64 } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -302,7 +306,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er err = merr.Err() } }() - meta, err := readMetaFile(dir) + meta, sizeMeta, err := readMetaFile(dir) if err != nil { return nil, err } @@ -319,43 +323,28 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er } closers = append(closers, ir) - tr, tsr, err := readTombstones(dir) + tr, sizeTomb, err := readTombstones(dir) if err != nil { return nil, err } closers = append(closers, tr) - // TODO refactor to set this at block creation time as - // that would be the logical place for a block size to be calculated. - bs := blockSize(cr, ir, tsr) - meta.Stats.NumBytes = bs - err = writeMetaFile(logger, dir, meta) - if err != nil { - level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) - } - pb = &Block{ - dir: dir, - meta: *meta, - chunkr: cr, - indexr: ir, - tombstones: tr, - symbolTableSize: ir.SymbolTableSize(), - logger: logger, + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, + tombstones: tr, + symbolTableSize: ir.SymbolTableSize(), + logger: logger, + numBytesChunks: cr.Size(), + numBytesIndex: ir.Size(), + numBytesTombstone: sizeTomb, + numBytesMeta: sizeMeta, } return pb, nil } -func blockSize(rr ...SizeReader) int64 { - var total int64 - for _, r := range rr { - if r != nil { - total += r.Size() - } - } - return total -} - // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -390,7 +379,9 @@ func (pb *Block) MinTime() int64 { return pb.meta.MinTime } func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime } // Size returns the number of bytes that the block takes up. -func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } +func (pb *Block) Size() int64 { + return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta +} // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") @@ -437,7 +428,12 @@ func (pb *Block) GetSymbolTableSize() uint64 { func (pb *Block) setCompactionFailed() error { pb.meta.Compaction.Failed = true - return writeMetaFile(pb.logger, pb.dir, &pb.meta) + n, err := writeMetaFile(pb.logger, pb.dir, &pb.meta) + if err != nil { + return err + } + pb.numBytesMeta = n + return nil } type blockIndexReader struct { @@ -561,10 +557,17 @@ Outer: pb.tombstones = stones pb.meta.Stats.NumTombstones = pb.tombstones.Total() - if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil { + n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones) + if err != nil { return err } - return writeMetaFile(pb.logger, pb.dir, &pb.meta) + pb.numBytesTombstone = n + n, err = writeMetaFile(pb.logger, pb.dir, &pb.meta) + if err != nil { + return err + } + pb.numBytesMeta = n + return nil } // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). diff --git a/block_test.go b/block_test.go index 7cd02ddb6..6718051a7 100644 --- a/block_test.go +++ b/block_test.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/tsdbutil" ) @@ -40,9 +41,10 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - testutil.Ok(t, writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{})) + _, err = writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{}) + testutil.Ok(t, err) - meta, err := readMetaFile(dir) + meta, _, err := readMetaFile(dir) testutil.Ok(t, err) testutil.Assert(t, meta.Version != 2, "meta.json version must never be 2") } @@ -149,6 +151,60 @@ func TestCorruptedChunk(t *testing.T) { } } +// TestBlockSize ensures that the block size is calculated correctly. +func TestBlockSize(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test_blockSize") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() + + var ( + blockInit *Block + expSizeInit int64 + blockDirInit string + ) + + // Create a block and compare the reported size vs actual disk size. + { + blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100)) + blockInit, err = OpenBlock(nil, blockDirInit, nil) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, blockInit.Close()) + }() + expSizeInit = blockInit.Size() + actSizeInit, err := testutil.DirSize(blockInit.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expSizeInit, actSizeInit) + } + + // Delete some series and check the sizes again. + { + testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*"))) + expAfterDelete := blockInit.Size() + testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit) + actAfterDelete, err := testutil.DirSize(blockDirInit) + testutil.Ok(t, err) + testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") + + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil) + testutil.Ok(t, err) + blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) + testutil.Ok(t, err) + blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, blockAfterCompact.Close()) + }() + expAfterCompact := blockAfterCompact.Size() + actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir()) + testutil.Ok(t, err) + testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact) + testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size") + } +} + // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []Series) string { head := createHead(tb, series) diff --git a/compact.go b/compact.go index 4a56f585c..10368250d 100644 --- a/compact.go +++ b/compact.go @@ -178,7 +178,7 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) { var dms []dirMeta for _, dir := range dirs { - meta, err := readMetaFile(dir) + meta, _, err := readMetaFile(dir) if err != nil { return nil, err } @@ -380,7 +380,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u start := time.Now() for _, d := range dirs { - meta, err := readMetaFile(d) + meta, _, err := readMetaFile(d) if err != nil { return uid, err } @@ -420,12 +420,14 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if meta.Stats.NumSamples == 0 { for _, b := range bs { b.meta.Compaction.Deletable = true - if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil { + n, err := writeMetaFile(c.logger, b.dir, &b.meta) + if err != nil { level.Error(c.logger).Log( "msg", "Failed to write 'Deletable' to meta file after compaction", "ulid", b.meta.ULID, ) } + b.numBytesMeta = n } uid = ulid.ULID{} level.Info(c.logger).Log( @@ -600,12 +602,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return nil } - if err = writeMetaFile(c.logger, tmp, meta); err != nil { + if _, err = writeMetaFile(c.logger, tmp, meta); err != nil { return errors.Wrap(err, "write merged meta") } // Create an empty tombstones file. - if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { + if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } diff --git a/db.go b/db.go index 127019b47..257a258f1 100644 --- a/db.go +++ b/db.go @@ -629,7 +629,7 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err corrupted = make(map[ulid.ULID]error) for _, dir := range dirs { - meta, err := readMetaFile(dir) + meta, _, err := readMetaFile(dir) if err != nil { level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) continue diff --git a/db_test.go b/db_test.go index 20dcf6306..27ab82cdf 100644 --- a/db_test.go +++ b/db_test.go @@ -100,9 +100,6 @@ func TestDB_reloadOrder(t *testing.T) { testutil.Ok(t, db.reload()) blocks := db.Blocks() - for _, b := range blocks { - b.meta.Stats.NumBytes = 0 - } testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) @@ -1060,7 +1057,8 @@ func TestSizeRetention(t *testing.T) { testutil.Ok(t, db.reload()) // Reload the db to register the new db size. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. - actSize := dbDiskSize(db.Dir()) + actSize, err := testutil.DirSize(db.Dir()) + testutil.Ok(t, err) testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") // Decrease the max bytes limit so that a delete is triggered. @@ -1074,7 +1072,8 @@ func TestSizeRetention(t *testing.T) { actBlocks := db.Blocks() expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) - actSize = dbDiskSize(db.Dir()) + actSize, err = testutil.DirSize(db.Dir()) + testutil.Ok(t, err) testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") @@ -1085,20 +1084,6 @@ func TestSizeRetention(t *testing.T) { } -func dbDiskSize(dir string) int64 { - var statSize int64 - filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - // Include only index,tombstone and chunks. - if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || - info.Name() == indexFilename || - info.Name() == tombstoneFilename { - statSize += info.Size() - } - return nil - }) - return statSize -} - func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { db, delete := openTestDB(t, nil) defer func() { diff --git a/repair.go b/repair.go index 38138b12a..1d299047a 100644 --- a/repair.go +++ b/repair.go @@ -109,7 +109,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { } // Reset version of meta.json to 1. meta.Version = 1 - if err := writeMetaFile(logger, d, meta); err != nil { + if _, err := writeMetaFile(logger, d, meta); err != nil { return wrapErr(err, d) } } diff --git a/repair_test.go b/repair_test.go index 15e85ff64..1d6345cd1 100644 --- a/repair_test.go +++ b/repair_test.go @@ -65,7 +65,7 @@ func TestRepairBadIndexVersion(t *testing.T) { // Check the current db. // In its current state, lookups should fail with the fixed code. - _, err := readMetaFile(dbDir) + _, _, err := readMetaFile(dbDir) testutil.NotOk(t, err) // Touch chunks dir in block. @@ -121,7 +121,7 @@ func TestRepairBadIndexVersion(t *testing.T) { {{"a", "2"}, {"b", "1"}}, }, res) - meta, err := readMetaFile(tmpDbDir) + meta, _, err := readMetaFile(tmpDbDir) testutil.Ok(t, err) testutil.Assert(t, meta.Version == 1, "unexpected meta version %d", meta.Version) } diff --git a/tombstones.go b/tombstones.go index 220af4900..d7b76230c 100644 --- a/tombstones.go +++ b/tombstones.go @@ -54,14 +54,15 @@ type TombstoneReader interface { Close() error } -func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error { +func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() + var size int f, err := os.Create(tmp) if err != nil { - return err + return 0, err } defer func() { if f != nil { @@ -79,10 +80,11 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error // Write the meta. buf.PutBE32(MagicTombstone) buf.PutByte(tombstoneFormatV1) - _, err = f.Write(buf.Get()) + n, err := f.Write(buf.Get()) if err != nil { - return err + return 0, err } + size += n mw := io.MultiWriter(f, hash) @@ -94,32 +96,34 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error buf.PutVarint64(iv.Mint) buf.PutVarint64(iv.Maxt) - _, err = mw.Write(buf.Get()) + n, err = mw.Write(buf.Get()) if err != nil { return err } + size += n } return nil }); err != nil { - return fmt.Errorf("error writing tombstones: %v", err) + return 0, fmt.Errorf("error writing tombstones: %v", err) } - _, err = f.Write(hash.Sum(nil)) + n, err = f.Write(hash.Sum(nil)) if err != nil { - return err + return 0, err } + size += n var merr tsdb_errors.MultiError if merr.Add(f.Sync()); merr.Err() != nil { merr.Add(f.Close()) - return merr.Err() + return 0, merr.Err() } if err = f.Close(); err != nil { - return err + return 0, err } f = nil - return fileutil.Replace(tmp, path) + return int64(size), fileutil.Replace(tmp, path) } // Stone holds the information on the posting and time-range @@ -129,41 +133,37 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, SizeReader, error) { +func readTombstones(dir string) (TombstoneReader, int64, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil, nil + return newMemTombstones(), 0, nil } else if err != nil { - return nil, nil, err - } - - sr := &TombstoneFile{ - size: int64(len(b)), + return nil, 0, err } if len(b) < 5 { - return nil, sr, errors.Wrap(encoding.ErrInvalidSize, "tombstones header") + return nil, 0, errors.Wrap(encoding.ErrInvalidSize, "tombstones header") } d := &encoding.Decbuf{B: b[:len(b)-4]} // 4 for the checksum. if mg := d.Be32(); mg != MagicTombstone { - return nil, sr, fmt.Errorf("invalid magic number %x", mg) + return nil, 0, fmt.Errorf("invalid magic number %x", mg) } if flag := d.Byte(); flag != tombstoneFormatV1 { - return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) + return nil, 0, fmt.Errorf("invalid tombstone format %x", flag) } if d.Err() != nil { - return nil, sr, d.Err() + return nil, 0, d.Err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.Get()); err != nil { - return nil, sr, errors.Wrap(err, "write to hash") + return nil, 0, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, sr, errors.New("checksum did not match") + return nil, 0, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -173,13 +173,13 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) { mint := d.Varint64() maxt := d.Varint64() if d.Err() != nil { - return nil, sr, d.Err() + return nil, 0, d.Err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, sr, nil + return stonesMap, int64(len(b)), nil } type memTombstones struct { @@ -230,16 +230,6 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } -// TombstoneFile holds information about the tombstone file. -type TombstoneFile struct { - size int64 -} - -// Size returns the tombstone file size. -func (t *TombstoneFile) Size() int64 { - return t.size -} - func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index 7b0d70b6b..33ebb3bcc 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -47,7 +47,8 @@ func TestWriteAndReadbackTombStones(t *testing.T) { stones.addInterval(ref, dranges...) } - testutil.Ok(t, writeTombstoneFile(log.NewNopLogger(), tmpdir, stones)) + _, err := writeTombstoneFile(log.NewNopLogger(), tmpdir, stones) + testutil.Ok(t, err) restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err)