diff --git a/CHANGELOG.md b/CHANGELOG.md index af44615e7..184734537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,15 @@ ## master / unreleased + +## 0.4.0 - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. + - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. ## 0.3.1 - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. diff --git a/block.go b/block.go index e5a66bd9e..42e11d951 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + // Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -182,6 +191,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Indicates that during compaction it resulted in a block without any samples + // so it should be deleted on the next reload. + Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` @@ -257,7 +269,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + total += r.Size() + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/block_test.go b/block_test.go index cf8716062..1a0409194 100644 --- a/block_test.go +++ b/block_test.go @@ -45,15 +45,15 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(tmpdir) - blockDir := createBlock(t, tmpdir, 0, 0, 0) - b, err := OpenBlock(blockDir, nil) + blockDir := createBlock(t, tmpdir, 1, 0, 0) + b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Ok(t, b.setCompactionFailed()) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) - b, err = OpenBlock(blockDir, nil) + b, err = OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) @@ -68,17 +68,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries) testutil.Ok(tb, err) - var ref uint64 + refs := make([]uint64, nSeries) for ts := mint; ts <= maxt; ts++ { app := head.Appender() - for _, lbl := range lbls { - err := app.AddFast(ref, ts, rand.Float64()) - if err == nil { - continue + for i, lbl := range lbls { + if refs[i] != 0 { + err := app.AddFast(refs[i], ts, rand.Float64()) + if err == nil { + continue + } } - ref, err = app.Add(lbl, int64(ts), rand.Float64()) + ref, err := app.Add(lbl, int64(ts), rand.Float64()) testutil.Ok(tb, err) + refs[i] = ref } err := app.Commit() testutil.Ok(tb, err) @@ -91,6 +94,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(tb, err) - return filepath.Join(dir, ulid.String()) } diff --git a/checkpoint.go b/checkpoint.go index d1cad4c10..1c6239232 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) defer sgmReader.Close() } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } + // Ensures that an early return caused by an error doesn't leave any tmp files. + defer func() { + cp.Close() + os.RemoveAll(cpdirtmp) + }() + r := wal.NewReader(sgmReader) var ( diff --git a/checkpoint_test.go b/checkpoint_test.go index 8538cf0c6..8b13c152a 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -15,11 +15,14 @@ package tsdb import ( + "fmt" "io/ioutil" "os" "path/filepath" + "strings" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) { {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, }, series) } + +func TestCheckpointNoTmpFolderAfterError(t *testing.T) { + // Create a new wal with an invalid records. + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + w, err := wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + testutil.Ok(t, w.Log([]byte{99})) + w.Close() + + // Run the checkpoint and since the wal contains an invalid records this should return an error. + _, err = Checkpoint(w, 0, 1, nil, 0) + testutil.NotOk(t, err) + + // Walk the wal dir to make sure there are no tmp folder left behind after the error. + err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { + if err != nil { + return errors.Wrapf(err, "access err %q: %v\n", path, err) + } + if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { + return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) + } + return nil + }) + testutil.Ok(t, err) +} diff --git a/chunks/chunks.go b/chunks/chunks.go index 11278e0b9..aa2737fdf 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -203,6 +203,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error { for _, c := range chks { maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. maxLen += int64(len(c.Chunk.Bytes())) + maxLen += 4 // The 4 bytes of crc32 } newsz := w.n + maxLen @@ -282,17 +283,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -302,7 +301,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -325,9 +326,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -343,6 +345,11 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + return s.size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 01a2858f1..f262d0dd8 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -138,7 +138,7 @@ func (b *writeBenchmark) run() { } b.storage = st - var metrics []labels.Labels + var labels []labels.Labels measureTime("readData", func() { f, err := os.Open(b.samplesFile) @@ -147,7 +147,7 @@ func (b *writeBenchmark) run() { } defer f.Close() - metrics, err = readPrometheusLabels(f, b.numMetrics) + labels, err = readPrometheusLabels(f, b.numMetrics) if err != nil { exitWithError(err) } @@ -157,7 +157,7 @@ func (b *writeBenchmark) run() { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 3000) + total, err = b.ingestScrapes(labels, 3000) if err != nil { exitWithError(err) } @@ -213,7 +213,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u return total, nil } -func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { +func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount int, baset int64) (uint64, error) { ts := baset type sample struct { @@ -222,9 +222,9 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount ref *uint64 } - scrape := make([]*sample, 0, len(metrics)) + scrape := make([]*sample, 0, len(lbls)) - for _, m := range metrics { + for _, m := range lbls { scrape = append(scrape, &sample{ labels: m, value: 123456789, diff --git a/compact.go b/compact.go index ef31ec887..56c35539d 100644 --- a/compact.go +++ b/compact.go @@ -55,12 +55,17 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } @@ -185,13 +190,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return res, nil } - // Compact any blocks that have >5% tombstones. + // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } - if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } @@ -346,7 +350,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } @@ -365,15 +369,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u meta := compactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + if err = writeMetaFile(b.dir, &b.meta); err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + } + uid = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } return uid, nil } @@ -412,6 +435,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return uid, err } + if meta.Stats.NumSamples == 0 { + return ulid.ULID{}, nil + } + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) return uid, nil } @@ -489,11 +516,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to @@ -505,6 +527,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + // Populated block is empty, so cleanup and exit. + if meta.Stats.NumSamples == 0 { + if err := os.RemoveAll(tmp); err != nil { + return errors.Wrap(err, "remove tmp folder after empty block failed") + } + return nil + } + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + // Create an empty tombstones file. if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") diff --git a/db.go b/db.go index 43cfadd46..1349dfbd5 100644 --- a/db.go +++ b/db.go @@ -58,6 +58,13 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -127,11 +134,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -170,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", @@ -197,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -204,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -340,25 +353,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -423,7 +417,8 @@ func (db *DB) compact() (err error) { // from the block interval here. maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { return errors.Wrap(err, "persist head block") } @@ -432,6 +427,14 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } + if (uid == ulid.ULID{}) { + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } + } runtime.GC() } @@ -474,8 +477,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -485,112 +487,193 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") + return err } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue - } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} - continue - } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} + deletable := db.deletableBlocks(loadable) + + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) - } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") } - // Load new blocks into memory. - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) - if !ok { - b, err = OpenBlock(dir, db.chunkPool) - if err != nil { - return errors.Wrapf(err, "open block %s", dir) - } - } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + bb = append(bb, block) + blocksSize += block.Size() + } - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { + if err := validateBlockSequence(loadable); err != nil { return errors.Wrap(err, "invalid block sequence") } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. + // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() oldBlocks := db.blocks - db.blocks = blocks + db.blocks = loadable db.mtx.Unlock() - // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue - } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { - if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { - return errors.Wrapf(err, "delete obsolete block %s", ulid) - } + + if err := db.deleteBlocks(deletable); err != nil { + return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. - if len(blocks) == 0 { + if len(loadable) == 0 { return nil } - maxt := blocks[len(blocks)-1].Meta().MaxTime + + maxt := loadable[len(loadable)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + + corrupted = make(map[ulid.ULID]error) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + + // See if we already have the block in memory or open it otherwise. + block, ok := db.getBlock(meta.ULID) + if !ok { + block, err = OpenBlock(db.logger, dir, db.chunkPool) + if err != nil { + corrupted[meta.ULID] = err + continue + } + } + blocks = append(blocks, block) + } + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime + }) + + for _, block := range blocks { + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = block + } + } + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block + } + + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } + + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + for i, block := range blocks { + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break + } + } + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break + } + } + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) + } + } + return nil +} + // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. func validateBlockSequence(bs []*Block) error { if len(bs) <= 1 { diff --git a/db_test.go b/db_test.go index f835fc8d0..1f1e997ad 100644 --- a/db_test.go +++ b/db_test.go @@ -92,6 +92,9 @@ func TestDB_reloadOrder(t *testing.T) { testutil.Ok(t, db.reload()) blocks := db.Blocks() + for _, b := range blocks { + b.meta.Stats.NumBytes = 0 + } testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) @@ -829,8 +832,8 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), 0, 0, 0) - block, err := OpenBlock(blockDir, nil) + blockDir := createBlock(t, db.Dir(), 1, 0, 0) + block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. tomb := newMemTombstones() @@ -873,7 +876,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -897,59 +900,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } -func TestDB_Retention(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - - lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} - - app := db.Appender() - _, err := app.Add(lbls, 0, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // create snapshot to make it create a block. - // TODO(gouthamve): Add a method to compact headblock. - snap, err := ioutil.TempDir("", "snap") - testutil.Ok(t, err) - - defer os.RemoveAll(snap) - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) - testutil.Ok(t, err) - - testutil.Equals(t, 1, len(db.blocks)) - - app = db.Appender() - _, err = app.Add(lbls, 100, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // Snapshot again to create another block. - snap, err = ioutil.TempDir("", "snap") - testutil.Ok(t, err) - defer os.RemoveAll(snap) - - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, &Options{ - RetentionDuration: 10, - BlockRanges: []int64{50}, +func TestTimeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1000}, }) - testutil.Ok(t, err) + defer close() defer db.Close() - testutil.Equals(t, 2, len(db.blocks)) + blocks := []*BlockMeta{ + {MinTime: 500, MaxTime: 900}, // Oldest block + {MinTime: 1000, MaxTime: 1500}, + {MinTime: 1500, MaxTime: 2000}, // Newest Block + } - // Reload blocks, which should drop blocks beyond the retention boundary. + for _, m := range blocks { + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + } + + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + + db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) testutil.Ok(t, db.reload()) - testutil.Equals(t, 1, len(db.blocks)) - testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") + testutil.Equals(t, len(expBlocks), len(actBlocks)) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) +} + +func TestSizeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + + blocks := []*BlockMeta{ + {MinTime: 100, MaxTime: 200}, // Oldest block + {MinTime: 200, MaxTime: 300}, + {MinTime: 300, MaxTime: 400}, + {MinTime: 400, MaxTime: 500}, + {MinTime: 500, MaxTime: 600}, // Newest Block + } + + for _, m := range blocks { + createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + } + + // Test that registered size matches the actual disk size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. + actSize := dbDiskSize(db.Dir()) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Decrease the max bytes limit so that a delete is triggered. + // Check total size, total count and check that the oldest block was deleted. + firstBlockSize := db.Blocks()[0].Size() + sizeLimit := actSize - firstBlockSize + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) + actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) + actSize = dbDiskSize(db.Dir()) + + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") + testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) + testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") + +} + +func dbDiskSize(dir string) int64 { + var statSize int64 + filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Include only index,tombstone and chunks. + if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || + info.Name() == indexFilename || + info.Name() == tombstoneFilename { + statSize += info.Size() + } + return nil + }) + return statSize } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { @@ -1318,6 +1360,109 @@ func TestInitializeHeadTimestamp(t *testing.T) { }) } +func TestNoEmptyBlocks(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewMustRegexpMatcher("", ".*") + + t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { + testutil.Ok(t, db.compact()) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here") + }) + + t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + + app = db.Appender() + _, err = app.Add(defaultLabel, 1, 0) + testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + + // Adding new blocks. + currentTime := db.Head().MaxTime() + _, err = app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Ok(t, db.compact()) + testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") + }) + + t.Run(`When no new block is created from head, and there are some blocks on disk + compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { + oldBlocks := db.Blocks() + app := db.Appender() + currentTime := db.Head().MaxTime() + _, err := app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + testutil.Equals(t, oldBlocks, db.Blocks()) + }) + + t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { + currentTime := db.Head().MaxTime() + blocks := []*BlockMeta{ + {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, + {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, + } + for _, m := range blocks { + createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) + } + + oldBlocks := db.Blocks() + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.") + }) +} + func TestDB_LabelNames(t *testing.T) { tests := []struct { // Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk -> @@ -1422,12 +1567,13 @@ func TestCorrectNumTombstones(t *testing.T) { defer db.Close() blockRange := DefaultOptions.BlockRanges[0] - label := labels.FromStrings("foo", "bar") + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) app := db.Appender() for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { - _, err := app.Add(label, i*blockRange+j, 0) + _, err := app.Add(defaultLabel, i*blockRange+j, 0) testutil.Ok(t, err) } } @@ -1437,17 +1583,17 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 1, len(db.blocks)) - testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(0, 1, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) // {0, 1} and {2, 3} are merged to form 1 tombstone. - testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(2, 3, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(5, 6, defaultMatcher)) testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(9, 11, defaultMatcher)) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } diff --git a/index/encoding_helpers.go b/index/encoding_helpers.go index 9afb1a055..a06b31416 100644 --- a/index/encoding_helpers.go +++ b/index/encoding_helpers.go @@ -18,6 +18,8 @@ import ( "hash" "hash/crc32" "unsafe" + + "github.com/pkg/errors" ) // enbuf is a helper type to populate a byte slice with various types. @@ -83,6 +85,60 @@ type decbuf struct { e error } +// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. +func newDecbufAt(bs ByteSlice, off int) decbuf { + if bs.Len() < off+4 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if bs.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func newDecbufUvarintAt(bs ByteSlice, off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if bs.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n <= 0 || n > binary.MaxVarintLen32 { + return decbuf{e: errors.Errorf("invalid uvarint %d", n)} + } + + if bs.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec +} + func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } diff --git a/index/index.go b/index/index.go index 0c1018dba..c65a454ef 100644 --- a/index/index.go +++ b/index/index.go @@ -20,6 +20,7 @@ import ( "hash" "hash/crc32" "io" + "io/ioutil" "math" "os" "path/filepath" @@ -35,9 +36,13 @@ import ( const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 + // HeaderLen represents number of bytes reserved of index for header. + HeaderLen = 5 - indexFormatV1 = 1 - indexFormatV2 = 2 + // FormatV1 represents 1 version of index. + FormatV1 = 1 + // FormatV2 represents 2 version of index. + FormatV2 = 2 labelNameSeperator = "\xff" ) @@ -108,7 +113,7 @@ type Writer struct { fbuf *bufio.Writer pos uint64 - toc indexTOC + toc TOC stage indexWriterStage // Reusable memory. @@ -129,13 +134,42 @@ type Writer struct { Version int } -type indexTOC struct { - symbols uint64 - series uint64 - labelIndices uint64 - labelIndicesTable uint64 - postings uint64 - postingsTable uint64 +// TOC represents index Table Of Content that states where each section of index starts. +type TOC struct { + Symbols uint64 + Series uint64 + LabelIndices uint64 + LabelIndicesTable uint64 + Postings uint64 + PostingsTable uint64 +} + +// NewTOCFromByteSlice return parsed TOC from given index byte slice. +func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { + if bs.Len() < indexTOCLen { + return nil, errInvalidSize + } + b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return nil, errors.Wrap(errInvalidChecksum, "read TOC") + } + + if err := d.err(); err != nil { + return nil, err + } + + return &TOC{ + Symbols: d.be64(), + Series: d.be64(), + LabelIndices: d.be64(), + LabelIndicesTable: d.be64(), + Postings: d.be64(), + PostingsTable: d.be64(), + }, nil } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. @@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.symbols = w.pos + w.toc.Symbols = w.pos case idxStageSeries: - w.toc.series = w.pos + w.toc.Series = w.pos case idxStageLabelIndex: - w.toc.labelIndices = w.pos + w.toc.LabelIndices = w.pos case idxStagePostings: - w.toc.postings = w.pos + w.toc.Postings = w.pos case idxStageDone: - w.toc.labelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.pos if err := w.writeOffsetTable(w.labelIndexes); err != nil { return err } - w.toc.postingsTable = w.pos + w.toc.PostingsTable = w.pos if err := w.writeOffsetTable(w.postings); err != nil { return err } @@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { func (w *Writer) writeMeta() error { w.buf1.reset() w.buf1.putBE32(MagicIndex) - w.buf1.putByte(indexFormatV2) + w.buf1.putByte(FormatV2) return w.write(w.buf1.get()) } @@ -436,12 +470,12 @@ const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { w.buf1.reset() - w.buf1.putBE64(w.toc.symbols) - w.buf1.putBE64(w.toc.series) - w.buf1.putBE64(w.toc.labelIndices) - w.buf1.putBE64(w.toc.labelIndicesTable) - w.buf1.putBE64(w.toc.postings) - w.buf1.putBE64(w.toc.postingsTable) + w.buf1.putBE64(w.toc.Symbols) + w.buf1.putBE64(w.toc.Series) + w.buf1.putBE64(w.toc.LabelIndices) + w.buf1.putBE64(w.toc.LabelIndicesTable) + w.buf1.putBE64(w.toc.Postings) + w.buf1.putBE64(w.toc.PostingsTable) w.buf1.putHash(w.crc32) @@ -533,15 +567,14 @@ type StringTuples interface { } type Reader struct { - // The underlying byte slice holding the encoded series data. - b ByteSlice - toc indexTOC + b ByteSlice // Close that releases the underlying resources of the byte slice. c io.Closer // Cached hashmaps of section offsets. - labels map[string]uint64 + labels map[string]uint64 + // LabelName to LabelValue to offset map. postings map[string]map[string]uint64 // Cache of read symbols. Strings that are returned when reading from the // block are always backed by true strings held in here rather than @@ -549,13 +582,12 @@ type Reader struct { // prevents memory faults when applications work with read symbols after // the block has been unmapped. The older format has sparse indexes so a map // must be used, but the new format is not so we can use a slice. - symbols map[uint32]string - symbolSlice []string + symbolsV1 map[uint32]string + symbolsV2 []string + symbolsTableSize uint64 dec *Decoder - crc32 hash.Hash32 - version int } @@ -584,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// NewReader returns a new IndexReader on the given byte slice. It automatically +// NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, nil) + return newReader(b, ioutil.NopCloser(nil)) } // NewFileReader returns a new index reader against the given index file. @@ -603,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { r := &Reader{ b: b, c: c, - symbols: map[uint32]string{}, labels: map[string]uint64{}, postings: map[string]map[string]uint64{}, - crc32: newCRC32(), } // Verify header. - if b.Len() < 5 { + if r.b.Len() < HeaderLen { return nil, errors.Wrap(errInvalidSize, "index header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { @@ -618,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != indexFormatV1 && r.version != indexFormatV2 { + if r.version != FormatV1 && r.version != FormatV2 { return nil, errors.Errorf("unknown index file version %d", r.version) } - if err := r.readTOC(); err != nil { + toc, err := NewTOCFromByteSlice(b) + if err != nil { return nil, errors.Wrap(err, "read TOC") } - if err := r.readSymbols(int(r.toc.symbols)); err != nil { + + r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols)) + if err != nil { return nil, errors.Wrap(err, "read symbols") } - var err error // Use the strings already allocated by symbols, rather than // re-allocating them again below. - symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice)) - for _, s := range r.symbols { - symbols[s] = s + // Additionally, calculate symbolsTableSize. + allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2)) + for _, s := range r.symbolsV1 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - for _, s := range r.symbolSlice { - symbols[s] = s + for _, s := range r.symbolsV2 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error { if len(key) != 1 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for label indices table %d", len(key)) } - r.labels[symbols[key[0]]] = off + + r.labels[allocatedSymbols[key[0]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read label index table") } + r.postings[""] = map[string]uint64{} - err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error { if len(key) != 2 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for posting table %d", len(key)) } if _, ok := r.postings[key[0]]; !ok { - r.postings[symbols[key[0]]] = map[string]uint64{} + r.postings[allocatedSymbols[key[0]]] = map[string]uint64{} } - r.postings[key[0]][symbols[key[1]]] = off + r.postings[key[0]][allocatedSymbols[key[1]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read postings table") } - r.dec = &Decoder{lookupSymbol: r.lookupSymbol} + r.dec = &Decoder{LookupSymbol: r.lookupSymbol} return r, nil } @@ -687,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { for k, e := range r.postings { for v, start := range e { - d := r.decbufAt(int(start)) + d := newDecbufAt(r.b, int(start)) if d.err() != nil { return nil, d.err() } @@ -700,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { return m, nil } -func (r *Reader) readTOC() error { - if r.b.Len() < indexTOCLen { - return errInvalidSize - } - b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) - - expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) - d := decbuf{b: b[:len(b)-4]} - - if d.crc32() != expCRC { - return errors.Wrap(errInvalidChecksum, "read TOC") - } - - r.toc.symbols = d.be64() - r.toc.series = d.be64() - r.toc.labelIndices = d.be64() - r.toc.labelIndicesTable = d.be64() - r.toc.postings = d.be64() - r.toc.postingsTable = d.be64() - - return d.err() -} - -// decbufAt returns a new decoding buffer. It expects the first 4 bytes -// after offset to hold the big endian encoded content length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufAt(off int) decbuf { - if r.b.Len() < off+4 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+4) - l := int(binary.BigEndian.Uint32(b)) - - if r.b.Len() < off+4+l+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+4, off+4+l+4) - dec := decbuf{b: b[:len(b)-4]} - - if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// decbufUvarintAt returns a new decoding buffer. It expects the first bytes -// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufUvarintAt(off int) decbuf { - // We never have to access this method at the far end of the byte slice. Thus just checking - // against the MaxVarintLen32 is sufficient. - if r.b.Len() < off+binary.MaxVarintLen32 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+binary.MaxVarintLen32) - - l, n := binary.Uvarint(b) - if n <= 0 || n > binary.MaxVarintLen32 { - return decbuf{e: errors.Errorf("invalid uvarint %d", n)} - } - - if r.b.Len() < off+n+int(l)+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+n, off+n+int(l)+4) - dec := decbuf{b: b[:len(b)-4]} - - if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// readSymbols reads the symbol table fully into memory and allocates proper strings for them. +// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. -func (r *Reader) readSymbols(off int) error { +func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) { if off == 0 { - return nil + return nil, nil, nil } - d := r.decbufAt(off) + d := newDecbufAt(bs, off) var ( - origLen = d.len() - cnt = d.be32int() - basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d.len()) + origLen = d.len() + cnt = d.be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d.len()) + symbolSlice []string + symbols = map[uint32]string{} ) - if r.version == indexFormatV2 { - r.symbolSlice = make([]string, 0, cnt) + if version == FormatV2 { + symbolSlice = make([]string, 0, cnt) } for d.err() == nil && d.len() > 0 && cnt > 0 { s := d.uvarintStr() - if r.version == indexFormatV2 { - r.symbolSlice = append(r.symbolSlice, s) + if version == FormatV2 { + symbolSlice = append(symbolSlice, s) } else { - r.symbols[nextPos] = s + symbols[nextPos] = s nextPos = basePos + uint32(origLen-d.len()) } cnt-- } - return errors.Wrap(d.err(), "read symbols") + return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols") } -// readOffsetTable reads an offset table at the given position calls f for each -// found entry.f -// If f returns an error it stops decoding and returns the received error, -func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error { - d := r.decbufAt(int(off)) +// ReadOffsetTable reads an offset table and at the given position calls f for each +// found entry. If f returns an error it stops decoding and returns the received error. +func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error { + d := newDecbufAt(bs, int(off)) cnt := d.be32() for d.err() == nil && d.len() > 0 && cnt > 0 { @@ -842,10 +801,10 @@ func (r *Reader) Close() error { } func (r *Reader) lookupSymbol(o uint32) (string, error) { - if int(o) < len(r.symbolSlice) { - return r.symbolSlice[o], nil + if int(o) < len(r.symbolsV2) { + return r.symbolsV2[o], nil } - s, ok := r.symbols[o] + s, ok := r.symbolsV1[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) } @@ -854,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) { // Symbols returns a set of symbols that exist within the index. func (r *Reader) Symbols() (map[string]struct{}, error) { - res := make(map[string]struct{}, len(r.symbols)) + res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2)) - for _, s := range r.symbols { + for _, s := range r.symbolsV1 { res[s] = struct{}{} } - for _, s := range r.symbolSlice { + for _, s := range r.symbolsV2 { res[s] = struct{}{} } return res, nil } -// SymbolTableSize returns the symbol table that is used to resolve symbol references. +// SymbolTableSize returns the symbol table size in bytes. func (r *Reader) SymbolTableSize() uint64 { - var size int - for _, s := range r.symbols { - size += len(s) + 8 - } - for _, s := range r.symbolSlice { - size += len(s) + 8 - } - return uint64(size) + return r.symbolsTableSize } // LabelValues returns value tuples that exist for the given label name tuples. @@ -889,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) nc := d.be32int() d.be32() // consume unused value entry count. @@ -913,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 } // LabelIndices returns a slice of label names for which labels or label tuples value indices exist. // NOTE: This is deprecated. Use `LabelNames()` instead. func (r *Reader) LabelIndices() ([][]string, error) { - res := [][]string{} + var res [][]string for s := range r.labels { res = append(res, strings.Split(s, labelNameSeperator)) } @@ -925,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == indexFormatV2 { + if r.version == FormatV2 { offset = id * 16 } - d := r.decbufUvarintAt(int(offset)) + d := newDecbufUvarintAt(r.b, int(offset)) if d.err() != nil { return d.err() } @@ -945,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if !ok { return EmptyPostings(), nil } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) if d.err() != nil { return nil, errors.Wrap(d.err(), "get postings entry") } @@ -962,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) @@ -1059,7 +1016,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - lookupSymbol func(uint32) (string, error) + LookupSymbol func(uint32) (string, error) } // Postings returns a postings list for b and its number of elements. @@ -1087,11 +1044,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e return errors.Wrap(d.err(), "read series label offsets") } - ln, err := dec.lookupSymbol(lno) + ln, err := dec.LookupSymbol(lno) if err != nil { return errors.Wrap(err, "lookup label name") } - lv, err := dec.lookupSymbol(lvo) + lv, err := dec.LookupSymbol(lvo) if err != nil { return errors.Wrap(err, "lookup label value") } diff --git a/index/index_test.go b/index/index_test.go index f915bca28..2edd3956a 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -380,13 +380,28 @@ func TestPersistence_index_e2e(t *testing.T) { } } + gotSymbols, err := ir.Symbols() + testutil.Ok(t, err) + + testutil.Equals(t, len(mi.symbols), len(gotSymbols)) + for s := range mi.symbols { + _, ok := gotSymbols[s] + testutil.Assert(t, ok, "") + } + testutil.Ok(t, ir.Close()) } +func TestDecbufUvariantWithInvalidBuffer(t *testing.T) { + b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) + + db := newDecbufUvarintAt(b, 0) + testutil.NotOk(t, db.err()) +} + func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - r := &Reader{b: b} - db := r.decbufUvarintAt(0) - testutil.NotOk(t, db.err()) + _, err := NewReader(b) + testutil.NotOk(t, err) } diff --git a/querier_test.go b/querier_test.go index 79daa5c01..caaf085fc 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1206,12 +1206,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkPersistedQueries(b *testing.B) { for _, nSeries := range []int{10, 100} { - for _, nSamples := range []int{1000, 10000, 100000} { + for _, nSamples := range []int64{1000, 10000, 100000} { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block, err := OpenBlock(createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) testutil.Ok(b, err) defer block.Close() diff --git a/tombstones.go b/tombstones.go index a1f30b59c..078140406 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index 5bc1a7f2d..95ef42f8f 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) - restr, err := readTombstones(tmpdir) + restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers. diff --git a/wal/wal.go b/wal/wal.go index 92374e312..fd90eb90e 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -164,6 +164,7 @@ type WAL struct { page *page // active page stopc chan chan struct{} actorc chan func() + closed bool // To allow calling Close() more than once without blocking. fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) { w.mtx.Lock() defer w.mtx.Unlock() + if w.closed { + return nil + } + // Flush the last page and zero out all its remaining size. // We must not flush an empty page as it would falsely signal // the segment is done if we start writing to it again after opening. @@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } - + w.closed = true return nil } @@ -827,28 +832,13 @@ func (r *Reader) next() (err error) { } r.rec = append(r.rec, buf[:length]...) - switch r.curRecTyp { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record") - } - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record") - } - case recLast: - if i == 0 { - return errors.New("unexpected last record") - } - return nil - default: - return errors.Errorf("unexpected record type %d", r.curRecTyp) + if err := validateRecord(r.curRecTyp, i); err != nil { + return err } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + // Only increment i for non-zero records since we use it // to determine valid content record sequences. i++ @@ -899,6 +889,226 @@ func (r *Reader) Offset() int64 { return r.total } +// NewLiveReader returns a new live reader. +func NewLiveReader(r io.Reader) *LiveReader { + return &LiveReader{rdr: r} +} + +// Reader reads WAL records from an io.Reader. It buffers partial record data for +// the next read. +type LiveReader struct { + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. +} + +func (r *LiveReader) Err() error { + return r.err +} + +func (r *LiveReader) TotalRead() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() error { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return err +} + +// Shift the buffer up to the read index. +func (r *LiveReader) shiftBuffer() { + copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) + r.readIndex = 0 + r.writeIndex = copied +} + +// Next returns true if r.rec will contain a full record. +// False does not indicate that there will never be more data to +// read for the current io.Reader. +func (r *LiveReader) Next() bool { + for { + if r.buildRecord() { + return true + } + if r.err != nil && r.err != io.EOF { + return false + } + if r.readIndex == pageSize { + r.shiftBuffer() + } + if r.writeIndex != pageSize { + if err := r.fillBuffer(); err != nil { + // We expect to get EOF, since we're reading the segment file as it's being written. + if err != io.EOF { + r.err = err + } + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appeneded to +// LiveReader.rec +func (r *LiveReader) buildRecord() bool { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) + r.readIndex += n + r.total += int64(n) + if err != nil { + r.err = err + return false + } + + if temp == nil { + return false + } + + rt := recType(r.hdr[0]) + + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.err = err + r.index = 0 + return false + } + if rt == recLast || rt == recFull { + r.index = 0 + return true + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// TODO(callum) the EOF errors we're returning from this function should theoretically +// never happen, add a metric for them. +func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { + readIndex := 0 + header[0] = buf[0] + readIndex++ + total++ + + // The rest of this function is mostly from Reader.Next(). + typ := recType(header[0]) + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (total % pageSize) + if k == pageSize { + return nil, 1, nil // Initial 0 byte was last page byte. + } + + if k <= int64(len(buf)-readIndex) { + for _, v := range buf[readIndex : int64(readIndex)+k] { + readIndex++ + if v != 0 { + return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") + } + } + return nil, readIndex, nil + } + // Not enough bytes to read the rest of the page term rec. + // This theoretically should never happen, since we're only shifting the + // internal buffer of the live reader when we read to the end of page. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + if readIndex+recordHeaderSize-1 > len(buf) { + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) + readIndex += recordHeaderSize - 1 + total += int64(recordHeaderSize - 1) + var ( + length = binary.BigEndian.Uint16(header[1:]) + crc = binary.BigEndian.Uint32(header[3:]) + ) + readTo := int(length) + readIndex + if readTo > len(buf) { + if (readTo - readIndex) > pageSize { + return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) + } + // Not enough data to read all of the record data. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + recData := buf[readIndex:readTo] + readIndex += int(length) + total += int64(length) + + // TODO(callum) what should we do here, throw out the record? We should add a metric at least. + if c := crc32.Checksum(recData, castagnoliTable); c != crc { + return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + return recData, readIndex, nil +} + func min(i, j int) int { if i < j { return i diff --git a/wal/wal_test.go b/wal/wal_test.go index 8ac14ebc8..737c44ca9 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -17,15 +17,107 @@ package wal import ( "bytes" "encoding/binary" + "fmt" "hash/crc32" + "io" "io/ioutil" "math/rand" "os" + "path" + "sync" "testing" + "time" "github.com/prometheus/tsdb/testutil" ) +type record struct { + t recType + b []byte +} + +var data = make([]byte, 100000) +var testReaderCases = []struct { + t []record + exp [][]byte + fail bool +}{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, 1000), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, +} + func encodedRecord(t recType, b []byte) []byte { if t == recPageTerm { return append([]byte{0}, b...) @@ -39,95 +131,7 @@ func encodedRecord(t recType, b []byte) []byte { // TestReader feeds the reader a stream of encoded records with different types. func TestReader(t *testing.T) { - data := make([]byte, 100000) - _, err := rand.Read(data) - testutil.Ok(t, err) - - type record struct { - t recType - b []byte - } - cases := []struct { - t []record - exp [][]byte - fail bool - }{ - // Sequence of valid records. - { - t: []record{ - {recFull, data[0:200]}, - {recFirst, data[200:300]}, - {recLast, data[300:400]}, - {recFirst, data[400:800]}, - {recMiddle, data[800:900]}, - {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. - {recLast, data[900:900]}, - {recFirst, data[900:1000]}, - {recMiddle, data[1000:1200]}, - {recMiddle, data[1200:30000]}, - {recMiddle, data[30000:30001]}, - {recMiddle, data[30001:30001]}, - {recLast, data[30001:32000]}, - }, - exp: [][]byte{ - data[0:200], - data[200:400], - data[400:900], - data[900:32000], - }, - }, - // Exactly at the limit of one page minus the header size - { - t: []record{ - {recFull, data[0 : pageSize-recordHeaderSize]}, - }, - exp: [][]byte{ - data[:pageSize-recordHeaderSize], - }, - }, - // More than a full page, this exceeds our buffer and can never happen - // when written by the WAL. - { - t: []record{ - {recFull, data[0 : pageSize+1]}, - }, - fail: true, - }, - // Invalid orders of record types. - { - t: []record{{recMiddle, data[:200]}}, - fail: true, - }, - { - t: []record{{recLast, data[:200]}}, - fail: true, - }, - { - t: []record{ - {recFirst, data[:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - { - t: []record{ - {recFirst, data[:100]}, - {recMiddle, data[100:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - // Non-zero data after page termination. - { - t: []record{ - {recFull, data[:100]}, - {recPageTerm, append(make([]byte, 1000), 1)}, - }, - exp: [][]byte{data[:100]}, - fail: true, - }, - } - for i, c := range cases { + for i, c := range testReaderCases { t.Logf("test %d", i) var buf []byte @@ -154,6 +158,191 @@ func TestReader(t *testing.T) { } } +func TestReader_Live(t *testing.T) { + for i, c := range testReaderCases { + t.Logf("test %d", i) + dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i)) + t.Logf("created dir %s", dir) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // we're never going to have more than a single segment file per test case right now + f, err := os.Create(path.Join(dir, "00000000")) + testutil.Ok(t, err) + + // live reader doesn't work on readers created from bytes buffers, + // since we need to be able to write more data to the thing we're + // reading from after the reader has been created + wg := sync.WaitGroup{} + // make sure the reader doesn't start until at least one record is written + wg.Add(1) + go func() { + for i, rec := range c.t { + rec := encodedRecord(rec.t, rec.b) + n, err := f.Write(rec) + testutil.Ok(t, err) + testutil.Assert(t, n > 0, "no bytes were written to wal") + if i == 0 { + wg.Done() + } + } + }() + sr, err := OpenReadSegment(SegmentName(dir, 0)) + testutil.Ok(t, err) + lr := NewLiveReader(sr) + j := 0 + wg.Wait() + caseLoop: + for { + for ; lr.Next(); j++ { + rec := lr.Record() + t.Log("j: ", j) + testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") + if j == len(c.exp)-1 { + break caseLoop + } + + } + + // Because reads and writes are happening concurrently, unless we get an error we should + // attempt to read records again. + if j == 0 && lr.Err() == nil { + continue + } + + if !c.fail && lr.Err() != nil { + t.Fatalf("unexpected error: %s", lr.Err()) + } + if c.fail && lr.Err() == nil { + t.Fatalf("expected error but got none:\n\tinput: %+v", c.t) + } + if lr.Err() != nil { + t.Log("err: ", lr.Err()) + break + } + } + } +} + +func TestWAL_FuzzWriteRead_Live(t *testing.T) { + const count = 5000 + var input [][]byte + lock := sync.RWMutex{} + var recs [][]byte + var index int + + // Get size of segment. + getSegmentSize := func(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err + } + + readSegment := func(r *LiveReader) { + for r.Next() { + rec := r.Record() + lock.RLock() + l := len(input) + lock.RUnlock() + if index >= l { + t.Fatalf("read too many records") + } + lock.RLock() + if !bytes.Equal(input[index], rec) { + t.Fatalf("record %d (len %d) does not match (expected len %d)", + index, len(rec), len(input[index])) + } + lock.RUnlock() + index++ + } + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + } + + dir, err := ioutil.TempDir("", "wal_fuzz_live") + t.Log("created dir: ", dir) + testutil.Ok(t, err) + defer func() { + os.RemoveAll(dir) + }() + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + go func() { + for i := 0; i < count; i++ { + var sz int64 + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = pageSize * 8 + } + + rec := make([]byte, rand.Int63n(sz)) + _, err := rand.Read(rec) + testutil.Ok(t, err) + lock.Lock() + input = append(input, rec) + lock.Unlock() + recs = append(recs, rec) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + testutil.Ok(t, w.Log(recs...)) + }() + + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last > seg.i { + for { + readSegment(r) + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + size, err := getSegmentSize(dir, seg.i) + testutil.Ok(t, err) + // make sure we've read all of the current segment before rotating + if r.TotalRead() == size { + break + } + } + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + r = NewLiveReader(seg) + } + case <-readTicker.C: + readSegment(r) + } + if index == count { + break + } + } + testutil.Ok(t, r.Err()) +} func TestWAL_FuzzWriteRead(t *testing.T) { const count = 25000