From d4b9fe801f387be02037ff612899c17895ddc078 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 6 May 2020 21:00:00 +0530 Subject: [PATCH] M-map full chunks of Head from disk (#6679) When appending to the head and a chunk is full it is flushed to the disk and m-mapped (memory mapped) to free up memory Prom startup now happens in these stages - Iterate the m-maped chunks from disk and keep a map of series reference to its slice of mmapped chunks. - Iterate the WAL as usual. Whenever we create a new series, look for it's mmapped chunks in the map created before and add it to that series. If a head chunk is corrupted the currpted one and all chunks after that are deleted and the data after the corruption is recovered from the existing WAL which means that a corruption in m-mapped files results in NO data loss. [Mmaped chunks format](https://github.com/prometheus/prometheus/blob/master/tsdb/docs/format/head_chunks.md) - main difference is that the chunk for mmaping now also includes series reference because there is no index for mapping series to chunks. [The block chunks](https://github.com/prometheus/prometheus/blob/master/tsdb/docs/format/chunks.md) are accessed from the index which includes the offsets for the chunks in the chunks file - example - chunks of series ID have offsets 200, 500 etc in the chunk files. In case of mmaped chunks, the offsets are stored in memory and accessed from that. During WAL replay, these offsets are restored by iterating all m-mapped chunks as stated above by matching the series id present in the chunk header and offset of that chunk in that file. **Prombench results** _WAL Replay_ 1h Wal reply time 30% less wal reply time - 4m31 vs 3m36 2h Wal reply time 20% less wal reply time - 8m16 vs 7m _Memory During WAL Replay_ High Churn: 10-15% less RAM - 32gb vs 28gb 20% less RAM after compaction 34gb vs 27gb No Churn: 20-30% less RAM - 23gb vs 18gb 40% less RAM after compaction 32.5gb vs 20gb Screenshots are in [this comment](https://github.com/prometheus/prometheus/pull/6679#issuecomment-621678932) Signed-off-by: Ganesh Vernekar --- cmd/prometheus/query_log_test.go | 14 +- tsdb/block_test.go | 12 +- tsdb/chunks/head_chunks.go | 27 +- tsdb/chunks/head_chunks_test.go | 19 +- tsdb/compact_test.go | 7 +- tsdb/db.go | 26 +- tsdb/db_test.go | 29 +- tsdb/docs/format/head_chunks.md | 2 +- tsdb/head.go | 453 ++++++++++++++----- tsdb/head_bench_test.go | 16 +- tsdb/head_test.go | 751 ++++++++++++++++++++----------- tsdb/querier_bench_test.go | 14 +- tsdb/querier_test.go | 15 +- tsdb/tsdbblockutil.go | 12 +- web/api/v1/api_test.go | 14 +- web/web_test.go | 2 +- 16 files changed, 985 insertions(+), 428 deletions(-) diff --git a/cmd/prometheus/query_log_test.go b/cmd/prometheus/query_log_test.go index 12b2f930f1..eac6f6ec2a 100644 --- a/cmd/prometheus/query_log_test.go +++ b/cmd/prometheus/query_log_test.go @@ -246,7 +246,19 @@ func (p *queryLogTest) run(t *testing.T) { p.setQueryLog(t, "") } - params := append([]string{"-test.main", "--config.file=" + p.configFile.Name(), "--web.enable-lifecycle", fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port)}, p.params()...) + dir, err := ioutil.TempDir("", "query_log_test") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + params := append([]string{ + "-test.main", + "--config.file=" + p.configFile.Name(), + "--web.enable-lifecycle", + fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port), + "--storage.tsdb.path=" + dir, + }, p.params()...) prom := exec.Command(promPath, params...) diff --git a/tsdb/block_test.go b/tsdb/block_test.go index df18a08ace..529ed771a9 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -305,7 +305,12 @@ func TestReadIndexFormatV1(t *testing.T) { // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - return createBlockFromHead(tb, dir, createHead(tb, series)) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(tb, err) + defer func() { testutil.Ok(tb, os.RemoveAll(chunkDir)) }() + head := createHead(tb, series, chunkDir) + defer func() { testutil.Ok(tb, head.Close()) }() + return createBlockFromHead(tb, dir, head) } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { @@ -321,10 +326,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { return filepath.Join(dir, ulid.String()) } -func createHead(tb testing.TB, series []storage.Series) *Head { - head, err := NewHead(nil, nil, nil, 2*60*60*1000, DefaultStripeSize) +func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head { + head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(tb, err) - defer head.Close() app := head.Appender() for _, s := range series { diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index fd111c2dfc..52e20f866b 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -72,13 +72,13 @@ const ( ) // corruptionErr is an error that's returned when corruption is encountered. -type corruptionErr struct { +type CorruptionErr struct { Dir string FileIndex int Err error } -func (e *corruptionErr) Error() string { +func (e *CorruptionErr) Error() string { return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error() } @@ -512,7 +512,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { // and runs the provided function on each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64) error) (err error) { +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -550,7 +550,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, if allZeros { break } - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), @@ -577,12 +577,15 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, idx += ChunkEncodingSize // Skip encoding. dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) - idx += n + int(dataLen) // Skip the data. + idx += n + + numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2)) + idx += int(dataLen) // Skip the data. // In the beginning we only checked for the chunk meta size. // Now that we have added the chunk data length, we check for sufficient bytes again. if idx+CRCSize > fileEnd { - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), @@ -595,7 +598,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, return err } if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), @@ -607,14 +610,14 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mmapFile.maxt = maxt } - if err := f(seriesRef, chunkRef, mint, maxt); err != nil { + if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { return err } } if idx > fileEnd { // It should be equal to the slice length. - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), @@ -678,11 +681,11 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { return nil } -// Repair deletes all the head chunk files after the one which had the corruption +// DeleteCorrupted deletes all the head chunk files after the one which had the corruption // (including the corrupt file). -func (cdm *ChunkDiskMapper) Repair(originalErr error) error { +func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. - cerr, ok := err.(*corruptionErr) + cerr, ok := err.(*CorruptionErr) if !ok { return errors.Wrap(originalErr, "cannot handle error") } diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index fbea37aeea..073a16955d 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -39,6 +39,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { type expectedDataType struct { seriesRef, chunkRef uint64 mint, maxt int64 + numSamples uint16 chunk chunkenc.Chunk } expectedData := []expectedDataType{} @@ -51,11 +52,12 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) totalChunks++ expectedData = append(expectedData, expectedDataType{ - seriesRef: seriesRef, - mint: mint, - maxt: maxt, - chunkRef: chkRef, - chunk: chunk, + seriesRef: seriesRef, + mint: mint, + maxt: maxt, + chunkRef: chkRef, + chunk: chunk, + numSamples: uint16(chunk.NumSamples()), }) if hrw.curFileSequence != 1 { @@ -128,7 +130,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { testutil.Ok(t, err) idx := 0 - err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64) error { + err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { t.Helper() expData := expectedData[idx] @@ -136,6 +138,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { testutil.Equals(t, expData.chunkRef, chunkRef) testutil.Equals(t, expData.maxt, maxt) testutil.Equals(t, expData.maxt, maxt) + testutil.Equals(t, expData.numSamples, numSamples) actChunk, err := hrw.Chunk(expData.chunkRef) testutil.Ok(t, err) @@ -157,7 +160,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) { }() testutil.Assert(t, !hrw.fileMaxtSet, "") - testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Assert(t, hrw.fileMaxtSet, "") timeRange := 0 @@ -227,7 +230,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) { testutil.Ok(t, err) testutil.Assert(t, !hrw.fileMaxtSet, "") - testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Assert(t, hrw.fileMaxtSet, "") // Truncating files after restart. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 3b72fe02cc..a733c1e990 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -870,7 +870,12 @@ func BenchmarkCompactionFromHead(b *testing.B) { for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { labelValues := totalSeries / labelNames b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender() diff --git a/tsdb/db.go b/tsdb/db.go index e97499c041..17a69b01a8 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -309,7 +309,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { if err != nil { return err } - head, err := NewHead(nil, db.logger, w, 1, DefaultStripeSize) + head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize) if err != nil { return err } @@ -368,7 +368,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu blocks[i] = b } - head, err := NewHead(nil, db.logger, nil, 1, DefaultStripeSize) + head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize) if err != nil { return nil, err } @@ -379,11 +379,14 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu // Also add the WAL if the current blocks don't cover the requests time range. if maxBlockTime <= maxt { + if err := head.Close(); err != nil { + return nil, err + } w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) if err != nil { return nil, err } - head, err = NewHead(nil, db.logger, w, 1, DefaultStripeSize) + head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize) if err != nil { return nil, err } @@ -395,10 +398,10 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu // Set the wal to nil to disable all wal operations. // This is mainly to avoid blocking when closing the head. head.wal = nil - - db.closers = append(db.closers, head) } + db.closers = append(db.closers, head) + // TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance. // Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite. // Option 2: refactor Querier to use another independent func which @@ -583,19 +586,21 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs var wlog *wal.WAL segmentSize := wal.DefaultSegmentSize + walDir := filepath.Join(dir, "wal") // Wal is enabled. if opts.WALSegmentSize >= 0 { // Wal is set to a custom size. if opts.WALSegmentSize > 0 { segmentSize = opts.WALSegmentSize } - wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) + wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression) if err != nil { return nil, err } } - db.head, err = NewHead(r, l, wlog, rngs[0], opts.StripeSize) + db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize) + if err != nil { return nil, err } @@ -1018,9 +1023,10 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo deletable = make(map[ulid.ULID]*Block) walSize, _ := db.Head().wal.Size() - // Initializing size counter with WAL size, - // as that is part of the retention strategy. - blocksSize := walSize + headChunksSize := db.Head().chunkDiskMapper.Size() + // Initializing size counter with WAL size and Head chunks + // written to disk, as that is part of the retention strategy. + blocksSize := walSize + headChunksSize for i, block := range blocks { blocksSize += block.Size() if blocksSize > int64(db.opts.MaxBytes) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3ce6a3c2a6..6c12147a94 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -849,8 +849,15 @@ func TestWALSegmentSizeOptions(t *testing.T) { tests := map[int]func(dbdir string, segmentSize int){ // Default Wal Size. 0: func(dbDir string, segmentSize int) { - files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) + filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) testutil.Ok(t, err) + files := []os.FileInfo{} + for _, f := range filesAndDir { + if !f.IsDir() { + files = append(files, f) + } + } + // All the full segment files (all but the last) should match the segment size option. for _, f := range files[:len(files)-1] { testutil.Equals(t, int64(DefaultOptions().WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) } @@ -859,9 +866,16 @@ func TestWALSegmentSizeOptions(t *testing.T) { }, // Custom Wal Size. 2 * 32 * 1024: func(dbDir string, segmentSize int) { - files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) - testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") + filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) testutil.Ok(t, err) + files := []os.FileInfo{} + for _, f := range filesAndDir { + if !f.IsDir() { + files = append(files, f) + } + } + testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") + // All the full segment files (all but the last) should match the segment size option. for _, f := range files[:len(files)-1] { testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) } @@ -870,9 +884,12 @@ func TestWALSegmentSizeOptions(t *testing.T) { }, // Wal disabled. -1: func(dbDir string, segmentSize int) { - if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) { - t.Fatal("wal directory is present when the wal is disabled") - } + // Check that WAL dir is not there. + _, err := os.Stat(filepath.Join(dbDir, "wal")) + testutil.NotOk(t, err) + // Check that there is chunks dir. + _, err = os.Stat(mmappedChunksDir(dbDir)) + testutil.Ok(t, err) }, } for segmentSize, testFunc := range tests { diff --git a/tsdb/docs/format/head_chunks.md b/tsdb/docs/format/head_chunks.md index 32657e8044..5a320fa761 100644 --- a/tsdb/docs/format/head_chunks.md +++ b/tsdb/docs/format/head_chunks.md @@ -33,4 +33,4 @@ Unlike chunks in the on-disk blocks, here we additionally store series reference ┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐ | series ref <8 byte> | mint <8 byte, uint64> | maxt <8 byte, uint64> | encoding <1 byte> | len | data │ CRC32 <4 byte> │ └─────────────────────┴───────────────────────┴───────────────────────┴───────────────────┴───────────────┴──────────────┴────────────────┘ -``` +``` \ No newline at end of file diff --git a/tsdb/head.go b/tsdb/head.go index 4b374fd5d4..d56f167e31 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -16,6 +16,7 @@ package tsdb import ( "fmt" "math" + "path/filepath" "runtime" "sort" "strings" @@ -32,6 +33,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" @@ -54,12 +56,13 @@ type Head struct { minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. lastSeriesID uint64 - metrics *headMetrics - wal *wal.WAL - logger log.Logger - appendPool sync.Pool - seriesPool sync.Pool - bytesPool sync.Pool + metrics *headMetrics + wal *wal.WAL + logger log.Logger + appendPool sync.Pool + seriesPool sync.Pool + bytesPool sync.Pool + memChunkPool sync.Pool // All series addressable by their ID or hash. series *stripeSeries @@ -80,27 +83,35 @@ type Head struct { cardinalityMutex sync.Mutex cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. + + // chunkDiskMapper is used to write and read Head chunks to/from disk. + chunkDiskMapper *chunks.ChunkDiskMapper + // chunkDirRoot is the parent directory of the chunks directory. + chunkDirRoot string } type headMetrics struct { - activeAppenders prometheus.Gauge - series prometheus.GaugeFunc - seriesCreated prometheus.Counter - seriesRemoved prometheus.Counter - seriesNotFound prometheus.Counter - chunks prometheus.Gauge - chunksCreated prometheus.Counter - chunksRemoved prometheus.Counter - gcDuration prometheus.Summary - samplesAppended prometheus.Counter - walTruncateDuration prometheus.Summary - walCorruptionsTotal prometheus.Counter - headTruncateFail prometheus.Counter - headTruncateTotal prometheus.Counter - checkpointDeleteFail prometheus.Counter - checkpointDeleteTotal prometheus.Counter - checkpointCreationFail prometheus.Counter - checkpointCreationTotal prometheus.Counter + activeAppenders prometheus.Gauge + series prometheus.GaugeFunc + seriesCreated prometheus.Counter + seriesRemoved prometheus.Counter + seriesNotFound prometheus.Counter + chunks prometheus.Gauge + chunksCreated prometheus.Counter + chunksRemoved prometheus.Counter + gcDuration prometheus.Summary + samplesAppended prometheus.Counter + outOfBoundSamples prometheus.Counter + outOfOrderSamples prometheus.Counter + walTruncateDuration prometheus.Summary + walCorruptionsTotal prometheus.Counter + headTruncateFail prometheus.Counter + headTruncateTotal prometheus.Counter + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter + mmapChunkCorruptionTotal prometheus.Counter } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -155,6 +166,14 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }), + outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_out_of_bound_samples_total", + Help: "Total number of out of bound samples ingestion failed attempts.", + }), + outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_out_of_order_samples_total", + Help: "Total number of out of order samples ingestion failed attempts.", + }), headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_truncations_failed_total", Help: "Total number of head truncations that failed.", @@ -179,6 +198,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_checkpoint_creations_total", Help: "Total number of checkpoint creations attempted.", }), + mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_mmap_chunk_corruptions_total", + Help: "Total number of memory-mapped chunk corruptions.", + }), } if r != nil { @@ -195,12 +218,15 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.walTruncateDuration, m.walCorruptionsTotal, m.samplesAppended, + m.outOfBoundSamples, + m.outOfOrderSamples, m.headTruncateFail, m.headTruncateTotal, m.checkpointDeleteFail, m.checkpointDeleteTotal, m.checkpointCreationFail, m.checkpointCreationTotal, + m.mmapChunkCorruptionTotal, // Metrics bound to functions and not needed in tests // can be created and registered on the spot. prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -258,7 +284,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings // stripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, stripeSize int) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int) (*Head, error) { if l == nil { l = log.NewNopLogger() } @@ -278,12 +304,30 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int tombstones: tombstones.NewMemTombstones(), iso: newIsolation(), deleted: map[uint64]int{}, + memChunkPool: sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + }, + chunkDirRoot: chkDirRoot, } h.metrics = newHeadMetrics(h, r) + if pool == nil { + pool = chunkenc.NewPool() + } + + var err error + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), pool) + if err != nil { + return nil, err + } + return h, nil } +func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } + // processWALSamples adds a partition of samples it receives to the head and passes // them on to other workers. // Samples before the mint timestamp are discarded. @@ -312,7 +356,7 @@ func (h *Head) processWALSamples( } refSeries[s.Ref] = ms } - if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated { + if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } @@ -351,7 +395,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } } -func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { +func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -472,7 +516,20 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { for _, s := range v { series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - if !created { + if created { + // If this series gets a duplicate record, we don't restore its mmapped chunks, + // and instead restore everything from WAL records. + series.mmappedChunks = mmappedChunks[series.ref] + + h.metrics.chunks.Add(float64(len(series.mmappedChunks))) + h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks))) + + if len(series.mmappedChunks) > 0 { + h.updateMinMaxTime(series.minTime(), series.maxTime()) + } + } else { + // TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID. + // There's already a different ref for this series. multiRef[s.Ref] = series.ref } @@ -572,8 +629,20 @@ func (h *Head) Init(minValidTime int64) error { return nil } - level.Info(h.logger).Log("msg", "Replaying WAL, this may take awhile") + level.Info(h.logger).Log("msg", "Replaying WAL and on-disk memory mappable chunks if any, this may take a while") start := time.Now() + + mmappedChunks, err := h.loadMmappedChunks() + if err != nil { + level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err) + if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { + h.metrics.mmapChunkCorruptionTotal.Inc() + } + // If this fails, data will be recovered from WAL. + // Hence we wont lose any data (given WAL is not corrupt). + h.removeCorruptedMmappedChunks(err) + } + // Backfill the checkpoint first if it exists. dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) if err != nil && err != record.ErrNotFound { @@ -593,7 +662,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil { + if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil { return errors.Wrap(err, "backfill checkpoint") } startFrom++ @@ -614,7 +683,7 @@ func (h *Head) Init(minValidTime int64) error { } sr := wal.NewSegmentBufReader(s) - err = h.loadWAL(wal.NewReader(sr), multiRef) + err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) } @@ -629,6 +698,54 @@ func (h *Head) Init(minValidTime int64) error { return nil } +func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { + mmappedChunks := map[uint64][]*mmappedChunk{} + if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { + if maxt < h.minValidTime { + return nil + } + + slice := mmappedChunks[seriesRef] + if len(slice) > 0 { + if slice[len(slice)-1].maxTime >= mint { + return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) + } + } + + slice = append(slice, &mmappedChunk{ + ref: chunkRef, + minTime: mint, + maxTime: maxt, + numSamples: numSamples, + }) + mmappedChunks[seriesRef] = slice + return nil + }); err != nil { + return nil, errors.Wrap(err, "iterate on on-disk chunks") + } + return mmappedChunks, nil +} + +// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously +// loaded mmapped chunks. +func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk { + level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") + + if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { + level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err) + return map[uint64][]*mmappedChunk{} + } + + level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks") + mmappedChunks, err := h.loadMmappedChunks() + if err != nil { + level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) + mmappedChunks = map[uint64][]*mmappedChunk{} + } + + return mmappedChunks +} + // Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) (err error) { defer func() { @@ -662,6 +779,11 @@ func (h *Head) Truncate(mint int64) (err error) { level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + // Truncate the chunk m-mapper. + if err := h.chunkDiskMapper.Truncate(mint); err != nil { + return errors.Wrap(err, "truncate chunks.HeadReadWriter") + } + if h.wal == nil { return nil } @@ -947,6 +1069,7 @@ type headAppender struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { + a.head.metrics.outOfBoundSamples.Inc() return 0, storage.ErrOutOfBounds } @@ -973,6 +1096,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < a.minValidTime { + a.head.metrics.outOfBoundSamples.Inc() return storage.ErrOutOfBounds } @@ -983,6 +1107,9 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { s.Lock() if err := s.appendable(t, v); err != nil { s.Unlock() + if err == storage.ErrOutOfOrderSample { + a.head.metrics.outOfOrderSamples.Inc() + } return err } s.pendingCommit = true @@ -1051,13 +1178,14 @@ func (a *headAppender) Commit() error { for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() - ok, chunkCreated := series.append(s.T, s.V, a.appendID) + ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() if !ok { total-- + a.head.metrics.outOfOrderSamples.Inc() } if chunkCreated { a.head.metrics.chunks.Inc() @@ -1225,10 +1353,11 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReade mint = hmin } return &headChunkReader{ - head: h, - mint: mint, - maxt: maxt, - isoState: is, + head: h, + mint: mint, + maxt: maxt, + isoState: is, + memChunkPool: &h.memChunkPool, } } @@ -1271,16 +1400,19 @@ func (h *Head) compactable() bool { // Close flushes the WAL and closes the head. func (h *Head) Close() error { - if h.wal == nil { - return nil + var merr tsdb_errors.MultiError + merr.Add(h.chunkDiskMapper.Close()) + if h.wal != nil { + merr.Add(h.wal.Close()) } - return h.wal.Close() + return merr.Err() } type headChunkReader struct { - head *Head - mint, maxt int64 - isoState *isolationState + head *Head + mint, maxt int64 + isoState *isolationState + memChunkPool *sync.Pool } func (h *headChunkReader) Close() error { @@ -1315,10 +1447,17 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { } s.Lock() - c := s.chunk(int(cid)) + c, garbageCollect := s.chunk(int(cid), h.head.chunkDiskMapper) + defer func() { + if garbageCollect { + // Set this to nil so that Go GC can collect it after it has been used. + c.chunk = nil + h.memChunkPool.Put(c) + } + }() - // This means that the chunk has been garbage collected or is outside - // the specified range. + // This means that the chunk has been garbage collected (or) is outside + // the specified range (or) Head is closing. if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() return nil, storage.ErrNotFound @@ -1326,23 +1465,25 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s.Unlock() return &safeChunk{ - Chunk: c.chunk, - s: s, - cid: int(cid), - isoState: h.isoState, + Chunk: c.chunk, + s: s, + cid: int(cid), + isoState: h.isoState, + chunkDiskMapper: h.head.chunkDiskMapper, }, nil } type safeChunk struct { chunkenc.Chunk - s *memSeries - cid int - isoState *isolationState + s *memSeries + cid int + isoState *isolationState + chunkDiskMapper *chunks.ChunkDiskMapper } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid, c.isoState, reuseIter) + it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter) c.s.Unlock() return it } @@ -1448,23 +1589,24 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks *chks = (*chks)[:0] - for i, c := range s.chunks { + for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } - // Set the head chunks as open (being appended to). - maxTime := c.maxTime - if s.headChunk == c { - maxTime = math.MaxInt64 - } - *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, - MaxTime: maxTime, + MaxTime: c.maxTime, Ref: packChunkID(s.ref, uint64(s.chunkID(i))), }) } + if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { + *chks = append(*chks, chunks.Meta{ + MinTime: s.headChunk.minTime, + MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). + Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))), + }) + } return nil } @@ -1485,7 +1627,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { } func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { - s := newMemSeries(lset, id, h.chunkRange) + s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool) s, created := h.series.getOrSet(hash, s) if !created { @@ -1611,7 +1753,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { series.Lock() rmChunks += series.truncateChunksBefore(mint) - if len(series.chunks) > 0 || series.pendingCommit { + if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit { series.Unlock() continue } @@ -1704,12 +1846,12 @@ func (s sample) V() float64 { type memSeries struct { sync.RWMutex - ref uint64 - lset labels.Labels - chunks []*memChunk - headChunk *memChunk - chunkRange int64 - firstChunkID int + ref uint64 + lset labels.Labels + mmappedChunks []*mmappedChunk + headChunk *memChunk + chunkRange int64 + firstChunkID int nextAt int64 // Timestamp at which to cut the next chunk. sampleBuf [4]sample @@ -1717,25 +1859,31 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. + memChunkPool *sync.Pool + txs *txRing } -func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { +func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, - txs: newTxRing(4), + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, + txs: newTxRing(4), + memChunkPool: memChunkPool, } return s } func (s *memSeries) minTime() int64 { - if len(s.chunks) == 0 { - return math.MinInt64 + if len(s.mmappedChunks) > 0 { + return s.mmappedChunks[0].minTime } - return s.chunks[0].minTime + if s.headChunk != nil { + return s.headChunk.minTime + } + return math.MinInt64 } func (s *memSeries) maxTime() int64 { @@ -1746,30 +1894,45 @@ func (s *memSeries) maxTime() int64 { return c.maxTime } -func (s *memSeries) cut(mint int64) *memChunk { - c := &memChunk{ +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { + s.mmapCurrentHeadChunk(chunkDiskMapper) + + s.headChunk = &memChunk{ chunk: chunkenc.NewXORChunk(), minTime: mint, maxTime: math.MinInt64, } - s.chunks = append(s.chunks, c) - s.headChunk = c - - // Remove exceeding capacity from the previous chunk byte slice to save memory. - if l := len(s.chunks); l > 1 { - s.chunks[l-2].chunk.Compact() - } // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. s.nextAt = rangeForTimestamp(mint, s.chunkRange) - app, err := c.chunk.Appender() + app, err := s.headChunk.chunk.Appender() if err != nil { panic(err) } s.app = app - return c + return s.headChunk +} + +func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { + if s.headChunk == nil { + // There is no head chunk, so nothing to m-map here. + return + } + + chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk) + if err != nil { + if err != chunks.ErrChunkDiskMapperClosed { + panic(err) + } + } + s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ + ref: chunkRef, + numSamples: uint16(s.headChunk.chunk.NumSamples()), + minTime: s.headChunk.minTime, + maxTime: s.headChunk.maxTime, + }) } // appendable checks whether the given sample is valid for appending to the series. @@ -1793,12 +1956,34 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } -func (s *memSeries) chunk(id int) *memChunk { +// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. +// If garbageCollect is true, it means that the returned *memChunk +// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. +func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool) { + // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are + // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. + // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix + // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. ix := id - s.firstChunkID - if ix < 0 || ix >= len(s.chunks) { - return nil + if ix < 0 || ix > len(s.mmappedChunks) { + return nil, false } - return s.chunks[ix] + if ix == len(s.mmappedChunks) { + return s.headChunk, false + } + chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) + if err != nil { + if err == chunks.ErrChunkDiskMapperClosed { + return nil, false + } + // TODO(codesome): Find a better way to handle this error instead of a panic. + panic(err) + } + mc := s.memChunkPool.Get().(*memChunk) + mc.chunk = chk + mc.minTime = s.mmappedChunks[ix].minTime + mc.maxTime = s.mmappedChunks[ix].maxTime + return mc, true } func (s *memSeries) chunkID(pos int) int { @@ -1809,27 +1994,32 @@ func (s *memSeries) chunkID(pos int) int { // at or after mint. Chunk IDs remain unchanged. func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { var k int - for i, c := range s.chunks { - if c.maxTime >= mint { - break - } - k = i + 1 - } - s.chunks = append(s.chunks[:0], s.chunks[k:]...) - s.firstChunkID += k - if len(s.chunks) == 0 { + if s.headChunk != nil && s.headChunk.maxTime < mint { + // If head chunk is truncated, we can truncate all mmapped chunks. + k = 1 + len(s.mmappedChunks) + s.firstChunkID += k s.headChunk = nil - } else { - s.headChunk = s.chunks[len(s.chunks)-1] + s.mmappedChunks = nil + return k + } + if len(s.mmappedChunks) > 0 { + for i, c := range s.mmappedChunks { + if c.maxTime >= mint { + break + } + k = i + 1 + } + s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[k:]...) + s.firstChunkID += k } - return k } // append adds the sample (t, v) to the series. The caller also has to provide // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) -func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) { +// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -1838,7 +2028,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkC c := s.head() if c == nil { - c = s.cut(t) + if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { + // Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. + return false, false + } + // There is no chunk in this series yet, create the first chunk for the sample. + c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } numSamples := c.chunk.NumSamples() @@ -1854,7 +2049,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkC s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } if t >= s.nextAt { - c = s.cut(t) + c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } s.app.Append(t, v) @@ -1890,8 +2085,19 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator { - c := s.chunk(id) +// iterator returns a chunk iterator. +// It is unsafe to call this concurrently with s.append(...) without holding the series lock. +func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { + c, garbageCollect := s.chunk(id, chunkDiskMapper) + defer func() { + if garbageCollect { + // Set this to nil so that Go GC can collect it after it has been used. + // This should be done always at the end. + c.chunk = nil + s.memChunkPool.Put(c) + } + }() + // TODO(fabxc): Work around! A querier may have retrieved a pointer to a // series's chunk, which got then garbage collected before it got // accessed. We must ensure to not garbage collect as long as any @@ -1909,12 +2115,18 @@ func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Itera totalSamples := 0 // Total samples in this series. previousSamples := 0 // Samples before this chunk. - for j, d := range s.chunks { - totalSamples += d.chunk.NumSamples() + for j, d := range s.mmappedChunks { + totalSamples += int(d.numSamples) if j < ix { - previousSamples += d.chunk.NumSamples() + previousSamples += int(d.numSamples) } } + // mmappedChunks does not contain the last chunk. Hence check it separately. + if len(s.mmappedChunks) < ix { + previousSamples += s.headChunk.chunk.NumSamples() + } else { + totalSamples += s.headChunk.chunk.NumSamples() + } // Removing the extra transactionIDs that are relevant for samples that // come after this chunk, from the total transactionIDs. @@ -1943,7 +2155,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Itera return chunkenc.NewNopIterator() } - if id-s.firstChunkID < len(s.chunks)-1 { + if id-s.firstChunkID < len(s.mmappedChunks) { if stopAfter == numSamples { return c.chunk.Iterator(it) } @@ -1989,7 +2201,7 @@ type memChunk struct { minTime, maxTime int64 } -// Returns true if the chunk overlaps [mint, maxt]. +// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt]. func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { return mc.minTime <= maxt && mint <= mc.maxTime } @@ -2052,3 +2264,14 @@ func (ss stringset) slice() []string { sort.Strings(slice) return slice } + +type mmappedChunk struct { + ref uint64 + numSamples uint16 + minTime, maxTime int64 +} + +// Returns true if the chunk overlaps [mint, maxt]. +func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { + return mc.minTime <= maxt && mint <= mc.maxTime +} diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index c1ada34171..bd0f248267 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -14,6 +14,8 @@ package tsdb import ( + "io/ioutil" + "os" "strconv" "sync/atomic" "testing" @@ -23,8 +25,13 @@ import ( ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() @@ -34,8 +41,13 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { } func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ed4a4e51dc..7ac9b599b4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -19,7 +19,6 @@ import ( "math" "math/rand" "os" - "path" "path/filepath" "sort" "strconv" @@ -42,10 +41,11 @@ import ( func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - - h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize) - testutil.Ok(b, err) - defer h.Close() + h, _, closer := newTestHead(b, 10000, false) + defer closer() + defer func() { + testutil.Ok(b, h.Close()) + }() b.ReportAllocs() b.ResetTimer() @@ -173,7 +173,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Load the WAL. for i := 0; i < b.N; i++ { - h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) + h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize) testutil.Ok(b, err) h.Init(0) } @@ -209,20 +209,15 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, }, } - dir, err := ioutil.TempDir("", "test_read_wal") - testutil.Ok(t, err) + + head, w, closer := newTestHead(t, 1000, compress) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, head.Close()) }() - w, err := wal.New(nil, nil, dir, compress) - testutil.Ok(t, err) - defer w.Close() populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) - testutil.Ok(t, err) - testutil.Ok(t, head.Init(math.MinInt64)) testutil.Equals(t, uint64(101), head.lastSeriesID) @@ -244,72 +239,78 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, c.Err()) return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, nil))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, nil))) - testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, nil))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) }) } } func TestHead_WALMultiRef(t *testing.T) { - dir, err := ioutil.TempDir("", "test_wal_multi_ref") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - - head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) - testutil.Ok(t, err) + head, w, closer := newTestHead(t, 1000, false) + defer closer() testutil.Ok(t, head.Init(0)) + app := head.Appender() ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) - testutil.Ok(t, head.Truncate(200)) - + // Add another sample outside chunk range to mmap a chunk. app = head.Appender() - ref2, err := app.Add(labels.FromStrings("foo", "bar"), 300, 2) + _, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) - if ref1 == ref2 { - t.Fatal("Refs are the same") - } + testutil.Ok(t, head.Truncate(1600)) + + app = head.Appender() + ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) + + // Add another sample outside chunk range to mmap a chunk. + app = head.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 4.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) + + testutil.Assert(t, ref1 != ref2, "Refs are the same") testutil.Ok(t, head.Close()) - w, err = wal.New(nil, nil, dir, false) + w, err = wal.New(nil, nil, w.Dir(), false) testutil.Ok(t, err) - head, err = NewHead(nil, nil, w, 10000, DefaultStripeSize) + head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize) testutil.Ok(t, err) testutil.Ok(t, head.Init(0)) - defer head.Close() + defer func() { + testutil.Ok(t, head.Close()) + }() - q, err := NewBlockQuerier(head, 0, 300) + q, err := NewBlockQuerier(head, 0, 2100) testutil.Ok(t, err) series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{100, 1}, sample{300, 2}}}, series) + testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: { + sample{100, 1}, + sample{1500, 2}, + sample{1700, 3}, + sample{2000, 4}, + }}, series) } func TestHead_Truncate(t *testing.T) { - dir, err := ioutil.TempDir("", "test_truncate") - testutil.Ok(t, err) + h, _, closer := newTestHead(t, 1000, false) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, h.Close()) }() - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - - h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() - h.initTime(0) s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) @@ -317,35 +318,35 @@ func TestHead_Truncate(t *testing.T) { s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) - s1.chunks = []*memChunk{ - {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, - {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + s1.mmappedChunks = []*mmappedChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, + {minTime: 2000, maxTime: 2999}, } - s2.chunks = []*memChunk{ - {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, - {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, + s2.mmappedChunks = []*mmappedChunk{ + {minTime: 1000, maxTime: 1999}, + {minTime: 2000, maxTime: 2999}, + {minTime: 3000, maxTime: 3999}, } - s3.chunks = []*memChunk{ - {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, - {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + s3.mmappedChunks = []*mmappedChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, } - s4.chunks = []*memChunk{} + s4.mmappedChunks = []*mmappedChunk{} // Truncation need not be aligned. testutil.Ok(t, h.Truncate(1)) testutil.Ok(t, h.Truncate(2000)) - testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, - }, h.series.getByID(s1.ref).chunks) + testutil.Equals(t, []*mmappedChunk{ + {minTime: 2000, maxTime: 2999}, + }, h.series.getByID(s1.ref).mmappedChunks) - testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, - {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, - }, h.series.getByID(s2.ref).chunks) + testutil.Equals(t, []*mmappedChunk{ + {minTime: 2000, maxTime: 2999}, + {minTime: 3000, maxTime: 3999}, + }, h.series.getByID(s2.ref).mmappedChunks) testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") testutil.Assert(t, h.series.getByID(s4.ref) == nil, "") @@ -382,36 +383,57 @@ func TestHead_Truncate(t *testing.T) { // Validate various behaviors brought on by firstChunkID accounting for // garbage collected chunks. func TestMemSeries_truncateChunks(t *testing.T) { - s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) + dir, err := ioutil.TempDir("", "truncate_chunks") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, chunkDiskMapper.Close()) + }() + + memChunkPool := sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + } + + s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0) + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) testutil.Assert(t, ok == true, "sample append failed") } // Check that truncate removes half of the chunks and afterwards // that the ID of the last chunk still gives us the same chunk afterwards. - countBefore := len(s.chunks) + countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.chunkID(countBefore - 1) - lastChunk := s.chunk(lastID) + lastChunk, _ := s.chunk(lastID, chunkDiskMapper) - testutil.Assert(t, s.chunk(0) != nil, "") + chk, _ := s.chunk(0, chunkDiskMapper) + testutil.Assert(t, chk != nil, "") testutil.Assert(t, lastChunk != nil, "") s.truncateChunksBefore(2000) - testutil.Equals(t, int64(2000), s.chunks[0].minTime) - testutil.Assert(t, s.chunk(0) == nil, "first chunks not gone") - testutil.Equals(t, countBefore/2, len(s.chunks)) - testutil.Equals(t, lastChunk, s.chunk(lastID)) + testutil.Equals(t, int64(2000), s.mmappedChunks[0].minTime) + chk, _ = s.chunk(0, chunkDiskMapper) + testutil.Assert(t, chk == nil, "first chunks not gone") + testutil.Equals(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. + chk, _ = s.chunk(lastID, chunkDiskMapper) + testutil.Equals(t, lastChunk, chk) // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil, nil) + it1 := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") - it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil, nil) + it2 := s.iterator(s.chunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil) _, ok = it2.(*memSafeIterator) testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } @@ -432,20 +454,14 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - dir, err := ioutil.TempDir("", "test_delete_series") - testutil.Ok(t, err) + head, w, closer := newTestHead(t, 1000, compress) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, head.Close()) }() - w, err := wal.New(nil, nil, dir, compress) - testutil.Ok(t, err) - defer w.Close() populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) - testutil.Ok(t, err) - testutil.Ok(t, head.Init(math.MinInt64)) testutil.Ok(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))) @@ -506,23 +522,12 @@ func TestHeadDeleteSimple(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { for _, c := range cases { - dir, err := ioutil.TempDir("", "test_wal_reload") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress) - testutil.Ok(t, err) - defer w.Close() - - head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer head.Close() + head, w, closer := newTestHead(t, 1000, compress) + defer closer() app := head.Appender() for _, smpl := range smplsAll { - _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) } @@ -536,7 +541,7 @@ func TestHeadDeleteSimple(t *testing.T) { // Add more samples. app = head.Appender() for _, smpl := range c.addSamples { - _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) } @@ -545,10 +550,8 @@ func TestHeadDeleteSimple(t *testing.T) { // Compare the samples for both heads - before and after the reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) - defer reloadedW.Close() - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, DefaultStripeSize) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize) testutil.Ok(t, err) - defer reloadedHead.Close() testutil.Ok(t, reloadedHead.Init(0)) // Compare the query results for both heads - before and after the reload. @@ -559,6 +562,7 @@ func TestHeadDeleteSimple(t *testing.T) { actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) + testutil.Ok(t, q.Close()) expSeriesSet := newMockSeriesSet([]storage.Series{ newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample { ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) @@ -596,10 +600,13 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { + hb, _, closer := newTestHead(t, 1000000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() + numSamples := int64(10) - hb, err := NewHead(nil, nil, nil, 1000000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() app := hb.Appender() smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -640,19 +647,12 @@ func TestDeleteUntilCurMax(t *testing.T) { } func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { - dir, err := ioutil.TempDir("", "test_delete_wal") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - wlog, err := wal.NewSize(nil, nil, dir, 32768, false) - testutil.Ok(t, err) + numSamples := 10000 // Enough samples to cause a checkpoint. - numSamples := 10000 - hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, w, closer := newTestHead(t, int64(numSamples)*10, false) + defer closer() + for i := 0; i < numSamples; i++ { app := hb.Appender() _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) @@ -664,11 +664,11 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { testutil.Ok(t, hb.Close()) // Confirm there's been a checkpoint. - cdir, _, err := wal.LastCheckpoint(dir) + cdir, _, err := wal.LastCheckpoint(w.Dir()) testutil.Ok(t, err) // Read in checkpoint and WAL. recs := readTestWAL(t, cdir) - recs = append(recs, readTestWAL(t, dir)...) + recs = append(recs, readTestWAL(t, w.Dir())...) var series, samples, stones int for _, rec := range recs { @@ -740,13 +740,13 @@ func TestDelete_e2e(t *testing.T) { for _, l := range lbls { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - dir, _ := ioutil.TempDir("", "test") + + hb, _, closer := newTestHead(t, 100000, false) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, hb.Close()) }() - hb, err := NewHead(nil, nil, nil, 100000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + app := hb.Appender() for _, l := range lbls { ls := labels.New(l...) @@ -926,68 +926,85 @@ func TestComputeChunkEndTime(t *testing.T) { } func TestMemSeries_append(t *testing.T) { - s := newMemSeries(labels.Labels{}, 1, 500) + dir, err := ioutil.TempDir("", "append") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, chunkDiskMapper.Close()) + }() + + s := newMemSeries(labels.Labels{}, 1, 500, nil) // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0) + ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0) + ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3, 0) + ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0) + ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - testutil.Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range") - testutil.Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range") + testutil.Assert(t, len(s.mmappedChunks) == 1, "there should be only 1 mmapped chunk") + testutil.Assert(t, s.mmappedChunks[0].minTime == 998 && s.mmappedChunks[0].maxTime == 999, "wrong chunk range") + testutil.Assert(t, s.headChunk.minTime == 1000 && s.headChunk.maxTime == 1001, "wrong chunk range") // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0) + ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") } - testutil.Assert(t, len(s.chunks) > 7, "expected intermediate chunks") + testutil.Assert(t, len(s.mmappedChunks)+1 > 7, "expected intermediate chunks") // All chunks but the first and last should now be moderately full. - for i, c := range s.chunks[1 : len(s.chunks)-1] { - testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) + for i, c := range s.mmappedChunks[1:] { + chk, err := chunkDiskMapper.Chunk(c.ref) + testutil.Ok(t, err) + testutil.Assert(t, chk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, chk.NumSamples()) } } func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") @@ -1004,7 +1021,7 @@ func TestGCChunkAccess(t *testing.T) { testutil.Equals(t, 2, len(chunks)) cr := h.chunksRange(0, 1500, nil) - _, err = cr.Chunk(chunks[0].Ref) + _, err := cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) @@ -1019,27 +1036,29 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") @@ -1056,7 +1075,7 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, 2, len(chunks)) cr := h.chunksRange(0, 2000, nil) - _, err = cr.Chunk(chunks[0].Ref) + _, err := cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) @@ -1072,15 +1091,17 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) app := h.appender() lset := labels.FromStrings("a", "1") - _, err = app.Add(lset, 2100, 1) + _, err := app.Add(lset, 2100, 1) testutil.Ok(t, err) testutil.Ok(t, h.Truncate(2000)) @@ -1100,15 +1121,17 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) app := h.appender() lset := labels.FromStrings("a", "1") - _, err = app.Add(lset, 2100, 1) + _, err := app.Add(lset, 2100, 1) testutil.Ok(t, err) testutil.Ok(t, h.Truncate(2000)) @@ -1134,20 +1157,14 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestHead_LogRollback(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_rollback") - testutil.Ok(t, err) + h, w, closer := newTestHead(t, 1000, compress) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, h.Close()) }() - w, err := wal.New(nil, nil, dir, compress) - testutil.Ok(t, err) - defer w.Close() - h, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) - testutil.Ok(t, err) - app := h.Appender() - _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) + _, err := app.Add(labels.FromStrings("a", "b"), 1, 2) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) @@ -1231,7 +1248,7 @@ func TestWalRepair_DecodingError(t *testing.T) { testutil.Ok(t, w.Log(test.rec)) } - h, err := NewHead(nil, nil, w, 1, DefaultStripeSize) + h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, DefaultStripeSize) testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1271,18 +1288,78 @@ func TestWalRepair_DecodingError(t *testing.T) { } } -func TestNewWalSegmentOnTruncate(t *testing.T) { - dir, err := ioutil.TempDir("", "test_wal_segments") +func TestHeadReadWriterRepair(t *testing.T) { + dir, err := ioutil.TempDir("", "head_read_writer_repair") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768, false) - testutil.Ok(t, err) - h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + const chunkRange = chunks.DefaultHeadChunkFileMaxTimeRange // to hold 4 chunks per segment. + + walDir := filepath.Join(dir, "wal") + // Fill the chunk segments and corrupt it. + { + w, err := wal.New(nil, nil, walDir, false) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, w, chunkRange, dir, nil, DefaultStripeSize) + testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) + testutil.Ok(t, h.Init(math.MinInt64)) + + s, created := h.getOrCreate(1, labels.FromStrings("a", "1")) + testutil.Assert(t, created, "series was not created") + + for i := 0; i < 7; i++ { + ok, chunkCreated := s.append(int64(i*int(chunkRange)), float64(i*int(chunkRange)), 0, h.chunkDiskMapper) + testutil.Assert(t, ok, "series append failed") + testutil.Assert(t, chunkCreated, "chunk was not created") + ok, chunkCreated = s.append(int64(i*int(chunkRange))+chunkRange-1, float64(i*int(chunkRange)), 0, h.chunkDiskMapper) + testutil.Assert(t, ok, "series append failed") + testutil.Assert(t, !chunkCreated, "chunk was created") + } + testutil.Ok(t, h.Close()) + + // Verify that there are 6 segment files. + files, err := ioutil.ReadDir(mmappedChunksDir(dir)) + testutil.Ok(t, err) + testutil.Equals(t, 6, len(files)) + + // Corrupt the 4th file by writing a random byte to series ref. + f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666) + testutil.Ok(t, err) + n, err := f.WriteAt([]byte{67, 88}, chunks.HeadChunkFileHeaderSize+2) + testutil.Ok(t, err) + testutil.Equals(t, 2, n) + testutil.Ok(t, f.Close()) + } + + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.mmapChunkCorruptionTotal)) + } + + // Verify that there are 3 segment files after the repair. + // The segments from the corrupt segment should be removed. + { + files, err := ioutil.ReadDir(mmappedChunksDir(dir)) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(files)) + } +} + +func TestNewWalSegmentOnTruncate(t *testing.T) { + h, wlog, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() add := func(ts int64) { app := h.Appender() _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0) @@ -1309,21 +1386,15 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - dir, err := ioutil.TempDir("", "test_duplicate_label_name") - testutil.Ok(t, err) + h, _, closer := newTestHead(t, 1000, false) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, h.Close()) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768, false) - testutil.Ok(t, err) - - h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() add := func(labels labels.Labels, labelName string) { app := h.Appender() - _, err = app.Add(labels, 0, 0) + _, err := app.Add(labels, 0, 0) testutil.NotOk(t, err) testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) } @@ -1335,22 +1406,20 @@ func TestAddDuplicateLabelName(t *testing.T) { func TestMemSeriesIsolation(t *testing.T) { // Put a series, select it. GC it and then access it. - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() - lastValue := func(maxAppendID uint64) int { - idx, err := hb.Index() + lastValue := func(h *Head, maxAppendID uint64) int { + idx, err := h.Index() + testutil.Ok(t, err) - iso := hb.iso.State() + iso := h.iso.State() iso.maxAppendID = maxAppendID querier := &blockQuerier{ mint: 0, maxt: 10000, index: idx, - chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso), + chunks: h.chunksRange(math.MinInt64, math.MaxInt64, iso), tombstones: tombstones.NewMemTombstones(), } @@ -1368,50 +1437,61 @@ func TestMemSeriesIsolation(t *testing.T) { return -1 } - i := 1 - for ; i <= 1000; i++ { - var app storage.Appender - // To initialize bounds. - if hb.MinTime() == math.MaxInt64 { - app = &initAppender{head: hb} - } else { - a := hb.appender() - a.cleanupAppendIDsBelow = 0 - app = a - } + addSamples := func(h *Head) int { + i := 1 + for ; i <= 1000; i++ { + var app storage.Appender + // To initialize bounds. + if h.MinTime() == math.MaxInt64 { + app = &initAppender{head: h} + } else { + a := h.appender() + a.cleanupAppendIDsBelow = 0 + app = a + } - _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + return i } + testIsolation := func(h *Head, i int) { + } + + // Test isolation without restart of Head. + hb, _, closer := newTestHead(t, 1000, false) + i := addSamples(hb) + testIsolation(hb, i) + // Test simple cases in different chunks when no appendID cleanup has been performed. - testutil.Equals(t, 10, lastValue(10)) - testutil.Equals(t, 130, lastValue(130)) - testutil.Equals(t, 160, lastValue(160)) - testutil.Equals(t, 240, lastValue(240)) - testutil.Equals(t, 500, lastValue(500)) - testutil.Equals(t, 750, lastValue(750)) - testutil.Equals(t, 995, lastValue(995)) - testutil.Equals(t, 999, lastValue(999)) + testutil.Equals(t, 10, lastValue(hb, 10)) + testutil.Equals(t, 130, lastValue(hb, 130)) + testutil.Equals(t, 160, lastValue(hb, 160)) + testutil.Equals(t, 240, lastValue(hb, 240)) + testutil.Equals(t, 500, lastValue(hb, 500)) + testutil.Equals(t, 750, lastValue(hb, 750)) + testutil.Equals(t, 995, lastValue(hb, 995)) + testutil.Equals(t, 999, lastValue(hb, 999)) // Cleanup appendIDs below 500. app := hb.appender() app.cleanupAppendIDsBelow = 500 - _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) i++ // We should not get queries with a maxAppendID below 500 after the cleanup, // but they only take the remaining appendIDs into account. - testutil.Equals(t, 499, lastValue(10)) - testutil.Equals(t, 499, lastValue(130)) - testutil.Equals(t, 499, lastValue(160)) - testutil.Equals(t, 499, lastValue(240)) - testutil.Equals(t, 500, lastValue(500)) - testutil.Equals(t, 995, lastValue(995)) - testutil.Equals(t, 999, lastValue(999)) + testutil.Equals(t, 499, lastValue(hb, 10)) + testutil.Equals(t, 499, lastValue(hb, 130)) + testutil.Equals(t, 499, lastValue(hb, 160)) + testutil.Equals(t, 499, lastValue(hb, 240)) + testutil.Equals(t, 500, lastValue(hb, 500)) + testutil.Equals(t, 995, lastValue(hb, 995)) + testutil.Equals(t, 999, lastValue(hb, 999)) // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. @@ -1420,12 +1500,12 @@ func TestMemSeriesIsolation(t *testing.T) { _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - testutil.Equals(t, 999, lastValue(998)) - testutil.Equals(t, 999, lastValue(999)) - testutil.Equals(t, 1000, lastValue(1000)) - testutil.Equals(t, 1001, lastValue(1001)) - testutil.Equals(t, 1002, lastValue(1002)) - testutil.Equals(t, 1002, lastValue(1003)) + testutil.Equals(t, 999, lastValue(hb, 998)) + testutil.Equals(t, 999, lastValue(hb, 999)) + testutil.Equals(t, 1000, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1002, lastValue(hb, 1002)) + testutil.Equals(t, 1002, lastValue(hb, 1003)) i++ // Cleanup appendIDs below 1001, but with a rollback. @@ -1434,21 +1514,71 @@ func TestMemSeriesIsolation(t *testing.T) { _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) - testutil.Equals(t, 1000, lastValue(999)) - testutil.Equals(t, 1000, lastValue(1000)) - testutil.Equals(t, 1001, lastValue(1001)) - testutil.Equals(t, 1002, lastValue(1002)) - testutil.Equals(t, 1002, lastValue(1003)) + testutil.Equals(t, 1000, lastValue(hb, 999)) + testutil.Equals(t, 1000, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1002, lastValue(hb, 1002)) + testutil.Equals(t, 1002, lastValue(hb, 1003)) + + testutil.Ok(t, hb.Close()) + closer() + + // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. + hb, w, closer := newTestHead(t, 1000, false) + defer closer() + i = addSamples(hb) + testutil.Ok(t, hb.Close()) + + wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) + testutil.Ok(t, err) + hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, DefaultStripeSize) + defer func() { testutil.Ok(t, hb.Close()) }() + testutil.Ok(t, err) + testutil.Ok(t, hb.Init(0)) + + // No appends after restarting. Hence all should return the last value. + testutil.Equals(t, 1000, lastValue(hb, 10)) + testutil.Equals(t, 1000, lastValue(hb, 130)) + testutil.Equals(t, 1000, lastValue(hb, 160)) + testutil.Equals(t, 1000, lastValue(hb, 240)) + testutil.Equals(t, 1000, lastValue(hb, 500)) + + // Cleanup appendIDs below 1000, which means the sample buffer is + // the only thing with appendIDs. + app = hb.appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + i++ + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 1001, lastValue(hb, 998)) + testutil.Equals(t, 1001, lastValue(hb, 999)) + testutil.Equals(t, 1001, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1001, lastValue(hb, 1002)) + testutil.Equals(t, 1001, lastValue(hb, 1003)) + + // Cleanup appendIDs below 1002, but with a rollback. + app = hb.appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Rollback()) + testutil.Equals(t, 1001, lastValue(hb, 999)) + testutil.Equals(t, 1001, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1001, lastValue(hb, 1002)) + testutil.Equals(t, 1001, lastValue(hb, 1003)) } func TestIsolationRollback(t *testing.T) { // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() app := hb.Appender() - _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) @@ -1469,12 +1599,14 @@ func TestIsolationRollback(t *testing.T) { } func TestIsolationLowWatermarkMonotonous(t *testing.T) { - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() app1 := hb.Appender() - _, err = app1.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app1.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") @@ -1501,15 +1633,17 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { } func TestIsolationAppendIDZeroIsNoop(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) - ok, _ := s.append(0, 0, 0) + ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "Series append failed.") testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } @@ -1521,25 +1655,110 @@ func TestHeadSeriesChunkRace(t *testing.T) { } func TestIsolationWithoutAdd(t *testing.T) { - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() app := hb.Appender() testutil.Ok(t, app.Commit()) app = hb.Appender() - _, err = app.Add(labels.FromStrings("foo", "baz"), 1, 1) + _, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") } -func testHeadSeriesChunkRace(t *testing.T) { - h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) +func TestOutOfOrderSamplesMetric(t *testing.T) { + dir, err := ioutil.TempDir("", "test") testutil.Ok(t, err) - defer h.Close() + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + db, err := Open(dir, nil, nil, DefaultOptions()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + db.DisableCompactions() + + app := db.Appender() + for i := 1; i <= 5; i++ { + _, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + // Test out of order metric. + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 2, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), 3, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), 4, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + testutil.Ok(t, app.Commit()) + + // Compact Head to test out of bound metric. + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Equals(t, int64(math.MinInt64), db.head.minValidTime) + testutil.Ok(t, db.Compact()) + testutil.Assert(t, db.head.minValidTime > 0, "") + + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-2, 99) + testutil.Equals(t, storage.ErrOutOfBounds, err) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-1, 99) + testutil.Equals(t, storage.ErrOutOfBounds, err) + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) + testutil.Ok(t, app.Commit()) + + // Some more valid samples for out of order. + app = db.Appender() + for i := 1; i <= 5; i++ { + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+int64(i), 99) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + // Test out of order metric. + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+2, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+3, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+4, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + testutil.Ok(t, app.Commit()) +} + +func testHeadSeriesChunkRace(t *testing.T) { + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() testutil.Ok(t, h.Init(0)) app := h.Appender() @@ -1568,3 +1787,19 @@ func testHeadSeriesChunkRace(t *testing.T) { testutil.Ok(t, ss.Err()) wg.Wait() } + +func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL, func()) { + dir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize) + testutil.Ok(t, err) + + testutil.Ok(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + + return h, wlog, func() { + testutil.Ok(t, os.RemoveAll(dir)) + } +} diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 3d0a3fbcf6..2a03852f20 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -30,7 +30,12 @@ const ( ) func BenchmarkPostingsForMatchers(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer func() { testutil.Ok(b, h.Close()) @@ -126,7 +131,12 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { } func BenchmarkQuerierSelect(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() app := h.Appender() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index d6f8d10aaa..8cf13249aa 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1857,7 +1857,12 @@ func TestFindSetMatches(t *testing.T) { } func TestPostingsForMatchers(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(t, err) defer func() { testutil.Ok(t, h.Close()) @@ -2220,7 +2225,12 @@ func BenchmarkQueries(b *testing.B) { queryTypes["_3-Blocks"] = &querier{blocks: qs[0:3]} queryTypes["_10-Blocks"] = &querier{blocks: qs} - head := createHead(b, series) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + head := createHead(b, series, chunkDir) qHead, err := NewBlockQuerier(head, 1, int64(nSamples)) testutil.Ok(b, err) queryTypes["_Head"] = qHead @@ -2232,6 +2242,7 @@ func BenchmarkQueries(b *testing.B) { benchQuery(b, expExpansions, querier, selectors) }) } + testutil.Ok(b, head.Close()) } } } diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 5246edd64f..8a26e91e79 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -32,8 +32,9 @@ type MetricSample struct { } // CreateHead creates a TSDB writer head to write the sample data to. -func CreateHead(samples []*MetricSample, chunkRange int64, logger log.Logger) (*Head, error) { - head, err := NewHead(nil, logger, nil, chunkRange, DefaultStripeSize) +func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logger log.Logger) (*Head, error) { + head, err := NewHead(nil, logger, nil, chunkRange, chunkDir, nil, DefaultStripeSize) + if err != nil { return nil, err } @@ -60,10 +61,15 @@ func CreateBlock(samples []*MetricSample, dir string, mint, maxt int64, logger l if chunkRange < 0 { return "", ErrInvalidTimes } - head, err := CreateHead(samples, chunkRange, logger) + chunkDir := filepath.Join(dir, "chunks_tmp") + defer func() { + os.RemoveAll(chunkDir) + }() + head, err := CreateHead(samples, chunkRange, chunkDir, logger) if err != nil { return "", err } + defer head.Close() compactor, err := NewLeveledCompactor(context.Background(), nil, logger, ExponentialBlockRanges(DefaultBlockDuration, 3, 5), nil) if err != nil { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index e0c7c74f9f..d693feb4e9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1898,8 +1898,18 @@ type fakeDB struct { func (f *fakeDB) CleanTombstones() error { return f.err } func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err } func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err } -func (f *fakeDB) Stats(statsByLabelName string) (*tsdb.Stats, error) { - h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize) +func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) { + dbDir, err := ioutil.TempDir("", "tsdb-api-ready") + if err != nil { + return nil, err + } + defer func() { + err := os.RemoveAll(dbDir) + if retErr != nil { + retErr = err + } + }() + h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize) return h.Stats(statsByLabelName), nil } diff --git a/web/web_test.go b/web/web_test.go index 23fbf3e490..96c4c5c4f5 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -100,6 +100,7 @@ func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) { func TestReadyAndHealthy(t *testing.T) { t.Parallel() + dbDir, err := ioutil.TempDir("", "tsdb-ready") testutil.Ok(t, err) defer testutil.Ok(t, os.RemoveAll(dbDir)) @@ -426,7 +427,6 @@ func TestDebugHandler(t *testing.T) { func TestHTTPMetrics(t *testing.T) { t.Parallel() - handler := New(nil, &Options{ RoutePrefix: "/", ListenAddress: "somehost:9090",