diff --git a/cmd/prometheus/query_log_test.go b/cmd/prometheus/query_log_test.go index 12b2f930f1..eac6f6ec2a 100644 --- a/cmd/prometheus/query_log_test.go +++ b/cmd/prometheus/query_log_test.go @@ -246,7 +246,19 @@ func (p *queryLogTest) run(t *testing.T) { p.setQueryLog(t, "") } - params := append([]string{"-test.main", "--config.file=" + p.configFile.Name(), "--web.enable-lifecycle", fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port)}, p.params()...) + dir, err := ioutil.TempDir("", "query_log_test") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + params := append([]string{ + "-test.main", + "--config.file=" + p.configFile.Name(), + "--web.enable-lifecycle", + fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port), + "--storage.tsdb.path=" + dir, + }, p.params()...) prom := exec.Command(promPath, params...) diff --git a/tsdb/block_test.go b/tsdb/block_test.go index df18a08ace..529ed771a9 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -305,7 +305,12 @@ func TestReadIndexFormatV1(t *testing.T) { // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { - return createBlockFromHead(tb, dir, createHead(tb, series)) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(tb, err) + defer func() { testutil.Ok(tb, os.RemoveAll(chunkDir)) }() + head := createHead(tb, series, chunkDir) + defer func() { testutil.Ok(tb, head.Close()) }() + return createBlockFromHead(tb, dir, head) } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { @@ -321,10 +326,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { return filepath.Join(dir, ulid.String()) } -func createHead(tb testing.TB, series []storage.Series) *Head { - head, err := NewHead(nil, nil, nil, 2*60*60*1000, DefaultStripeSize) +func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head { + head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(tb, err) - defer head.Close() app := head.Appender() for _, s := range series { diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index fd111c2dfc..52e20f866b 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -72,13 +72,13 @@ const ( ) // corruptionErr is an error that's returned when corruption is encountered. -type corruptionErr struct { +type CorruptionErr struct { Dir string FileIndex int Err error } -func (e *corruptionErr) Error() string { +func (e *CorruptionErr) Error() string { return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error() } @@ -512,7 +512,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) { // and runs the provided function on each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64) error) (err error) { +func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() @@ -550,7 +550,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, if allZeros { break } - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), @@ -577,12 +577,15 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, idx += ChunkEncodingSize // Skip encoding. dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) - idx += n + int(dataLen) // Skip the data. + idx += n + + numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2)) + idx += int(dataLen) // Skip the data. // In the beginning we only checked for the chunk meta size. // Now that we have added the chunk data length, we check for sufficient bytes again. if idx+CRCSize > fileEnd { - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), @@ -595,7 +598,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, return err } if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), @@ -607,14 +610,14 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mmapFile.maxt = maxt } - if err := f(seriesRef, chunkRef, mint, maxt); err != nil { + if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { return err } } if idx > fileEnd { // It should be equal to the slice length. - return &corruptionErr{ + return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), @@ -678,11 +681,11 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { return nil } -// Repair deletes all the head chunk files after the one which had the corruption +// DeleteCorrupted deletes all the head chunk files after the one which had the corruption // (including the corrupt file). -func (cdm *ChunkDiskMapper) Repair(originalErr error) error { +func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. - cerr, ok := err.(*corruptionErr) + cerr, ok := err.(*CorruptionErr) if !ok { return errors.Wrap(originalErr, "cannot handle error") } diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index fbea37aeea..073a16955d 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -39,6 +39,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { type expectedDataType struct { seriesRef, chunkRef uint64 mint, maxt int64 + numSamples uint16 chunk chunkenc.Chunk } expectedData := []expectedDataType{} @@ -51,11 +52,12 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) totalChunks++ expectedData = append(expectedData, expectedDataType{ - seriesRef: seriesRef, - mint: mint, - maxt: maxt, - chunkRef: chkRef, - chunk: chunk, + seriesRef: seriesRef, + mint: mint, + maxt: maxt, + chunkRef: chkRef, + chunk: chunk, + numSamples: uint16(chunk.NumSamples()), }) if hrw.curFileSequence != 1 { @@ -128,7 +130,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { testutil.Ok(t, err) idx := 0 - err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64) error { + err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { t.Helper() expData := expectedData[idx] @@ -136,6 +138,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) { testutil.Equals(t, expData.chunkRef, chunkRef) testutil.Equals(t, expData.maxt, maxt) testutil.Equals(t, expData.maxt, maxt) + testutil.Equals(t, expData.numSamples, numSamples) actChunk, err := hrw.Chunk(expData.chunkRef) testutil.Ok(t, err) @@ -157,7 +160,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) { }() testutil.Assert(t, !hrw.fileMaxtSet, "") - testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Assert(t, hrw.fileMaxtSet, "") timeRange := 0 @@ -227,7 +230,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) { testutil.Ok(t, err) testutil.Assert(t, !hrw.fileMaxtSet, "") - testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Assert(t, hrw.fileMaxtSet, "") // Truncating files after restart. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 3b72fe02cc..a733c1e990 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -870,7 +870,12 @@ func BenchmarkCompactionFromHead(b *testing.B) { for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { labelValues := totalSeries / labelNames b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender() diff --git a/tsdb/db.go b/tsdb/db.go index e97499c041..17a69b01a8 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -309,7 +309,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { if err != nil { return err } - head, err := NewHead(nil, db.logger, w, 1, DefaultStripeSize) + head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize) if err != nil { return err } @@ -368,7 +368,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu blocks[i] = b } - head, err := NewHead(nil, db.logger, nil, 1, DefaultStripeSize) + head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize) if err != nil { return nil, err } @@ -379,11 +379,14 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu // Also add the WAL if the current blocks don't cover the requests time range. if maxBlockTime <= maxt { + if err := head.Close(); err != nil { + return nil, err + } w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) if err != nil { return nil, err } - head, err = NewHead(nil, db.logger, w, 1, DefaultStripeSize) + head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize) if err != nil { return nil, err } @@ -395,10 +398,10 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu // Set the wal to nil to disable all wal operations. // This is mainly to avoid blocking when closing the head. head.wal = nil - - db.closers = append(db.closers, head) } + db.closers = append(db.closers, head) + // TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance. // Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite. // Option 2: refactor Querier to use another independent func which @@ -583,19 +586,21 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs var wlog *wal.WAL segmentSize := wal.DefaultSegmentSize + walDir := filepath.Join(dir, "wal") // Wal is enabled. if opts.WALSegmentSize >= 0 { // Wal is set to a custom size. if opts.WALSegmentSize > 0 { segmentSize = opts.WALSegmentSize } - wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) + wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression) if err != nil { return nil, err } } - db.head, err = NewHead(r, l, wlog, rngs[0], opts.StripeSize) + db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize) + if err != nil { return nil, err } @@ -1018,9 +1023,10 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo deletable = make(map[ulid.ULID]*Block) walSize, _ := db.Head().wal.Size() - // Initializing size counter with WAL size, - // as that is part of the retention strategy. - blocksSize := walSize + headChunksSize := db.Head().chunkDiskMapper.Size() + // Initializing size counter with WAL size and Head chunks + // written to disk, as that is part of the retention strategy. + blocksSize := walSize + headChunksSize for i, block := range blocks { blocksSize += block.Size() if blocksSize > int64(db.opts.MaxBytes) { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3ce6a3c2a6..6c12147a94 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -849,8 +849,15 @@ func TestWALSegmentSizeOptions(t *testing.T) { tests := map[int]func(dbdir string, segmentSize int){ // Default Wal Size. 0: func(dbDir string, segmentSize int) { - files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) + filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) testutil.Ok(t, err) + files := []os.FileInfo{} + for _, f := range filesAndDir { + if !f.IsDir() { + files = append(files, f) + } + } + // All the full segment files (all but the last) should match the segment size option. for _, f := range files[:len(files)-1] { testutil.Equals(t, int64(DefaultOptions().WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) } @@ -859,9 +866,16 @@ func TestWALSegmentSizeOptions(t *testing.T) { }, // Custom Wal Size. 2 * 32 * 1024: func(dbDir string, segmentSize int) { - files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) - testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") + filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) testutil.Ok(t, err) + files := []os.FileInfo{} + for _, f := range filesAndDir { + if !f.IsDir() { + files = append(files, f) + } + } + testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") + // All the full segment files (all but the last) should match the segment size option. for _, f := range files[:len(files)-1] { testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) } @@ -870,9 +884,12 @@ func TestWALSegmentSizeOptions(t *testing.T) { }, // Wal disabled. -1: func(dbDir string, segmentSize int) { - if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) { - t.Fatal("wal directory is present when the wal is disabled") - } + // Check that WAL dir is not there. + _, err := os.Stat(filepath.Join(dbDir, "wal")) + testutil.NotOk(t, err) + // Check that there is chunks dir. + _, err = os.Stat(mmappedChunksDir(dbDir)) + testutil.Ok(t, err) }, } for segmentSize, testFunc := range tests { diff --git a/tsdb/docs/format/head_chunks.md b/tsdb/docs/format/head_chunks.md index 32657e8044..5a320fa761 100644 --- a/tsdb/docs/format/head_chunks.md +++ b/tsdb/docs/format/head_chunks.md @@ -33,4 +33,4 @@ Unlike chunks in the on-disk blocks, here we additionally store series reference ┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐ | series ref <8 byte> | mint <8 byte, uint64> | maxt <8 byte, uint64> | encoding <1 byte> | len | data │ CRC32 <4 byte> │ └─────────────────────┴───────────────────────┴───────────────────────┴───────────────────┴───────────────┴──────────────┴────────────────┘ -``` +``` \ No newline at end of file diff --git a/tsdb/head.go b/tsdb/head.go index 4b374fd5d4..d56f167e31 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -16,6 +16,7 @@ package tsdb import ( "fmt" "math" + "path/filepath" "runtime" "sort" "strings" @@ -32,6 +33,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/tombstones" @@ -54,12 +56,13 @@ type Head struct { minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. lastSeriesID uint64 - metrics *headMetrics - wal *wal.WAL - logger log.Logger - appendPool sync.Pool - seriesPool sync.Pool - bytesPool sync.Pool + metrics *headMetrics + wal *wal.WAL + logger log.Logger + appendPool sync.Pool + seriesPool sync.Pool + bytesPool sync.Pool + memChunkPool sync.Pool // All series addressable by their ID or hash. series *stripeSeries @@ -80,27 +83,35 @@ type Head struct { cardinalityMutex sync.Mutex cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. + + // chunkDiskMapper is used to write and read Head chunks to/from disk. + chunkDiskMapper *chunks.ChunkDiskMapper + // chunkDirRoot is the parent directory of the chunks directory. + chunkDirRoot string } type headMetrics struct { - activeAppenders prometheus.Gauge - series prometheus.GaugeFunc - seriesCreated prometheus.Counter - seriesRemoved prometheus.Counter - seriesNotFound prometheus.Counter - chunks prometheus.Gauge - chunksCreated prometheus.Counter - chunksRemoved prometheus.Counter - gcDuration prometheus.Summary - samplesAppended prometheus.Counter - walTruncateDuration prometheus.Summary - walCorruptionsTotal prometheus.Counter - headTruncateFail prometheus.Counter - headTruncateTotal prometheus.Counter - checkpointDeleteFail prometheus.Counter - checkpointDeleteTotal prometheus.Counter - checkpointCreationFail prometheus.Counter - checkpointCreationTotal prometheus.Counter + activeAppenders prometheus.Gauge + series prometheus.GaugeFunc + seriesCreated prometheus.Counter + seriesRemoved prometheus.Counter + seriesNotFound prometheus.Counter + chunks prometheus.Gauge + chunksCreated prometheus.Counter + chunksRemoved prometheus.Counter + gcDuration prometheus.Summary + samplesAppended prometheus.Counter + outOfBoundSamples prometheus.Counter + outOfOrderSamples prometheus.Counter + walTruncateDuration prometheus.Summary + walCorruptionsTotal prometheus.Counter + headTruncateFail prometheus.Counter + headTruncateTotal prometheus.Counter + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter + mmapChunkCorruptionTotal prometheus.Counter } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -155,6 +166,14 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }), + outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_out_of_bound_samples_total", + Help: "Total number of out of bound samples ingestion failed attempts.", + }), + outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_out_of_order_samples_total", + Help: "Total number of out of order samples ingestion failed attempts.", + }), headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_truncations_failed_total", Help: "Total number of head truncations that failed.", @@ -179,6 +198,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_checkpoint_creations_total", Help: "Total number of checkpoint creations attempted.", }), + mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_mmap_chunk_corruptions_total", + Help: "Total number of memory-mapped chunk corruptions.", + }), } if r != nil { @@ -195,12 +218,15 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.walTruncateDuration, m.walCorruptionsTotal, m.samplesAppended, + m.outOfBoundSamples, + m.outOfOrderSamples, m.headTruncateFail, m.headTruncateTotal, m.checkpointDeleteFail, m.checkpointDeleteTotal, m.checkpointCreationFail, m.checkpointCreationTotal, + m.mmapChunkCorruptionTotal, // Metrics bound to functions and not needed in tests // can be created and registered on the spot. prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -258,7 +284,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings // stripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, stripeSize int) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int) (*Head, error) { if l == nil { l = log.NewNopLogger() } @@ -278,12 +304,30 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int tombstones: tombstones.NewMemTombstones(), iso: newIsolation(), deleted: map[uint64]int{}, + memChunkPool: sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + }, + chunkDirRoot: chkDirRoot, } h.metrics = newHeadMetrics(h, r) + if pool == nil { + pool = chunkenc.NewPool() + } + + var err error + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), pool) + if err != nil { + return nil, err + } + return h, nil } +func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } + // processWALSamples adds a partition of samples it receives to the head and passes // them on to other workers. // Samples before the mint timestamp are discarded. @@ -312,7 +356,7 @@ func (h *Head) processWALSamples( } refSeries[s.Ref] = ms } - if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated { + if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } @@ -351,7 +395,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } } -func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { +func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -472,7 +516,20 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { for _, s := range v { series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - if !created { + if created { + // If this series gets a duplicate record, we don't restore its mmapped chunks, + // and instead restore everything from WAL records. + series.mmappedChunks = mmappedChunks[series.ref] + + h.metrics.chunks.Add(float64(len(series.mmappedChunks))) + h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks))) + + if len(series.mmappedChunks) > 0 { + h.updateMinMaxTime(series.minTime(), series.maxTime()) + } + } else { + // TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID. + // There's already a different ref for this series. multiRef[s.Ref] = series.ref } @@ -572,8 +629,20 @@ func (h *Head) Init(minValidTime int64) error { return nil } - level.Info(h.logger).Log("msg", "Replaying WAL, this may take awhile") + level.Info(h.logger).Log("msg", "Replaying WAL and on-disk memory mappable chunks if any, this may take a while") start := time.Now() + + mmappedChunks, err := h.loadMmappedChunks() + if err != nil { + level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err) + if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { + h.metrics.mmapChunkCorruptionTotal.Inc() + } + // If this fails, data will be recovered from WAL. + // Hence we wont lose any data (given WAL is not corrupt). + h.removeCorruptedMmappedChunks(err) + } + // Backfill the checkpoint first if it exists. dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) if err != nil && err != record.ErrNotFound { @@ -593,7 +662,7 @@ func (h *Head) Init(minValidTime int64) error { // A corrupted checkpoint is a hard error for now and requires user // intervention. There's likely little data that can be recovered anyway. - if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil { + if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil { return errors.Wrap(err, "backfill checkpoint") } startFrom++ @@ -614,7 +683,7 @@ func (h *Head) Init(minValidTime int64) error { } sr := wal.NewSegmentBufReader(s) - err = h.loadWAL(wal.NewReader(sr), multiRef) + err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks) if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) } @@ -629,6 +698,54 @@ func (h *Head) Init(minValidTime int64) error { return nil } +func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { + mmappedChunks := map[uint64][]*mmappedChunk{} + if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { + if maxt < h.minValidTime { + return nil + } + + slice := mmappedChunks[seriesRef] + if len(slice) > 0 { + if slice[len(slice)-1].maxTime >= mint { + return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) + } + } + + slice = append(slice, &mmappedChunk{ + ref: chunkRef, + minTime: mint, + maxTime: maxt, + numSamples: numSamples, + }) + mmappedChunks[seriesRef] = slice + return nil + }); err != nil { + return nil, errors.Wrap(err, "iterate on on-disk chunks") + } + return mmappedChunks, nil +} + +// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously +// loaded mmapped chunks. +func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk { + level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") + + if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { + level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err) + return map[uint64][]*mmappedChunk{} + } + + level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks") + mmappedChunks, err := h.loadMmappedChunks() + if err != nil { + level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) + mmappedChunks = map[uint64][]*mmappedChunk{} + } + + return mmappedChunks +} + // Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) (err error) { defer func() { @@ -662,6 +779,11 @@ func (h *Head) Truncate(mint int64) (err error) { level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + // Truncate the chunk m-mapper. + if err := h.chunkDiskMapper.Truncate(mint); err != nil { + return errors.Wrap(err, "truncate chunks.HeadReadWriter") + } + if h.wal == nil { return nil } @@ -947,6 +1069,7 @@ type headAppender struct { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { + a.head.metrics.outOfBoundSamples.Inc() return 0, storage.ErrOutOfBounds } @@ -973,6 +1096,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < a.minValidTime { + a.head.metrics.outOfBoundSamples.Inc() return storage.ErrOutOfBounds } @@ -983,6 +1107,9 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { s.Lock() if err := s.appendable(t, v); err != nil { s.Unlock() + if err == storage.ErrOutOfOrderSample { + a.head.metrics.outOfOrderSamples.Inc() + } return err } s.pendingCommit = true @@ -1051,13 +1178,14 @@ func (a *headAppender) Commit() error { for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() - ok, chunkCreated := series.append(s.T, s.V, a.appendID) + ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() if !ok { total-- + a.head.metrics.outOfOrderSamples.Inc() } if chunkCreated { a.head.metrics.chunks.Inc() @@ -1225,10 +1353,11 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReade mint = hmin } return &headChunkReader{ - head: h, - mint: mint, - maxt: maxt, - isoState: is, + head: h, + mint: mint, + maxt: maxt, + isoState: is, + memChunkPool: &h.memChunkPool, } } @@ -1271,16 +1400,19 @@ func (h *Head) compactable() bool { // Close flushes the WAL and closes the head. func (h *Head) Close() error { - if h.wal == nil { - return nil + var merr tsdb_errors.MultiError + merr.Add(h.chunkDiskMapper.Close()) + if h.wal != nil { + merr.Add(h.wal.Close()) } - return h.wal.Close() + return merr.Err() } type headChunkReader struct { - head *Head - mint, maxt int64 - isoState *isolationState + head *Head + mint, maxt int64 + isoState *isolationState + memChunkPool *sync.Pool } func (h *headChunkReader) Close() error { @@ -1315,10 +1447,17 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { } s.Lock() - c := s.chunk(int(cid)) + c, garbageCollect := s.chunk(int(cid), h.head.chunkDiskMapper) + defer func() { + if garbageCollect { + // Set this to nil so that Go GC can collect it after it has been used. + c.chunk = nil + h.memChunkPool.Put(c) + } + }() - // This means that the chunk has been garbage collected or is outside - // the specified range. + // This means that the chunk has been garbage collected (or) is outside + // the specified range (or) Head is closing. if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() return nil, storage.ErrNotFound @@ -1326,23 +1465,25 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s.Unlock() return &safeChunk{ - Chunk: c.chunk, - s: s, - cid: int(cid), - isoState: h.isoState, + Chunk: c.chunk, + s: s, + cid: int(cid), + isoState: h.isoState, + chunkDiskMapper: h.head.chunkDiskMapper, }, nil } type safeChunk struct { chunkenc.Chunk - s *memSeries - cid int - isoState *isolationState + s *memSeries + cid int + isoState *isolationState + chunkDiskMapper *chunks.ChunkDiskMapper } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid, c.isoState, reuseIter) + it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter) c.s.Unlock() return it } @@ -1448,23 +1589,24 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks *chks = (*chks)[:0] - for i, c := range s.chunks { + for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } - // Set the head chunks as open (being appended to). - maxTime := c.maxTime - if s.headChunk == c { - maxTime = math.MaxInt64 - } - *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, - MaxTime: maxTime, + MaxTime: c.maxTime, Ref: packChunkID(s.ref, uint64(s.chunkID(i))), }) } + if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { + *chks = append(*chks, chunks.Meta{ + MinTime: s.headChunk.minTime, + MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). + Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))), + }) + } return nil } @@ -1485,7 +1627,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) { } func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { - s := newMemSeries(lset, id, h.chunkRange) + s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool) s, created := h.series.getOrSet(hash, s) if !created { @@ -1611,7 +1753,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { series.Lock() rmChunks += series.truncateChunksBefore(mint) - if len(series.chunks) > 0 || series.pendingCommit { + if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit { series.Unlock() continue } @@ -1704,12 +1846,12 @@ func (s sample) V() float64 { type memSeries struct { sync.RWMutex - ref uint64 - lset labels.Labels - chunks []*memChunk - headChunk *memChunk - chunkRange int64 - firstChunkID int + ref uint64 + lset labels.Labels + mmappedChunks []*mmappedChunk + headChunk *memChunk + chunkRange int64 + firstChunkID int nextAt int64 // Timestamp at which to cut the next chunk. sampleBuf [4]sample @@ -1717,25 +1859,31 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. + memChunkPool *sync.Pool + txs *txRing } -func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { +func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, - txs: newTxRing(4), + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, + txs: newTxRing(4), + memChunkPool: memChunkPool, } return s } func (s *memSeries) minTime() int64 { - if len(s.chunks) == 0 { - return math.MinInt64 + if len(s.mmappedChunks) > 0 { + return s.mmappedChunks[0].minTime } - return s.chunks[0].minTime + if s.headChunk != nil { + return s.headChunk.minTime + } + return math.MinInt64 } func (s *memSeries) maxTime() int64 { @@ -1746,30 +1894,45 @@ func (s *memSeries) maxTime() int64 { return c.maxTime } -func (s *memSeries) cut(mint int64) *memChunk { - c := &memChunk{ +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { + s.mmapCurrentHeadChunk(chunkDiskMapper) + + s.headChunk = &memChunk{ chunk: chunkenc.NewXORChunk(), minTime: mint, maxTime: math.MinInt64, } - s.chunks = append(s.chunks, c) - s.headChunk = c - - // Remove exceeding capacity from the previous chunk byte slice to save memory. - if l := len(s.chunks); l > 1 { - s.chunks[l-2].chunk.Compact() - } // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. s.nextAt = rangeForTimestamp(mint, s.chunkRange) - app, err := c.chunk.Appender() + app, err := s.headChunk.chunk.Appender() if err != nil { panic(err) } s.app = app - return c + return s.headChunk +} + +func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { + if s.headChunk == nil { + // There is no head chunk, so nothing to m-map here. + return + } + + chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk) + if err != nil { + if err != chunks.ErrChunkDiskMapperClosed { + panic(err) + } + } + s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ + ref: chunkRef, + numSamples: uint16(s.headChunk.chunk.NumSamples()), + minTime: s.headChunk.minTime, + maxTime: s.headChunk.maxTime, + }) } // appendable checks whether the given sample is valid for appending to the series. @@ -1793,12 +1956,34 @@ func (s *memSeries) appendable(t int64, v float64) error { return nil } -func (s *memSeries) chunk(id int) *memChunk { +// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. +// If garbageCollect is true, it means that the returned *memChunk +// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. +func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool) { + // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are + // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. + // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix + // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. ix := id - s.firstChunkID - if ix < 0 || ix >= len(s.chunks) { - return nil + if ix < 0 || ix > len(s.mmappedChunks) { + return nil, false } - return s.chunks[ix] + if ix == len(s.mmappedChunks) { + return s.headChunk, false + } + chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) + if err != nil { + if err == chunks.ErrChunkDiskMapperClosed { + return nil, false + } + // TODO(codesome): Find a better way to handle this error instead of a panic. + panic(err) + } + mc := s.memChunkPool.Get().(*memChunk) + mc.chunk = chk + mc.minTime = s.mmappedChunks[ix].minTime + mc.maxTime = s.mmappedChunks[ix].maxTime + return mc, true } func (s *memSeries) chunkID(pos int) int { @@ -1809,27 +1994,32 @@ func (s *memSeries) chunkID(pos int) int { // at or after mint. Chunk IDs remain unchanged. func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { var k int - for i, c := range s.chunks { - if c.maxTime >= mint { - break - } - k = i + 1 - } - s.chunks = append(s.chunks[:0], s.chunks[k:]...) - s.firstChunkID += k - if len(s.chunks) == 0 { + if s.headChunk != nil && s.headChunk.maxTime < mint { + // If head chunk is truncated, we can truncate all mmapped chunks. + k = 1 + len(s.mmappedChunks) + s.firstChunkID += k s.headChunk = nil - } else { - s.headChunk = s.chunks[len(s.chunks)-1] + s.mmappedChunks = nil + return k + } + if len(s.mmappedChunks) > 0 { + for i, c := range s.mmappedChunks { + if c.maxTime >= mint { + break + } + k = i + 1 + } + s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[k:]...) + s.firstChunkID += k } - return k } // append adds the sample (t, v) to the series. The caller also has to provide // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) -func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) { +// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -1838,7 +2028,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkC c := s.head() if c == nil { - c = s.cut(t) + if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { + // Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. + return false, false + } + // There is no chunk in this series yet, create the first chunk for the sample. + c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } numSamples := c.chunk.NumSamples() @@ -1854,7 +2049,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkC s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } if t >= s.nextAt { - c = s.cut(t) + c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } s.app.Append(t, v) @@ -1890,8 +2085,19 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator { - c := s.chunk(id) +// iterator returns a chunk iterator. +// It is unsafe to call this concurrently with s.append(...) without holding the series lock. +func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { + c, garbageCollect := s.chunk(id, chunkDiskMapper) + defer func() { + if garbageCollect { + // Set this to nil so that Go GC can collect it after it has been used. + // This should be done always at the end. + c.chunk = nil + s.memChunkPool.Put(c) + } + }() + // TODO(fabxc): Work around! A querier may have retrieved a pointer to a // series's chunk, which got then garbage collected before it got // accessed. We must ensure to not garbage collect as long as any @@ -1909,12 +2115,18 @@ func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Itera totalSamples := 0 // Total samples in this series. previousSamples := 0 // Samples before this chunk. - for j, d := range s.chunks { - totalSamples += d.chunk.NumSamples() + for j, d := range s.mmappedChunks { + totalSamples += int(d.numSamples) if j < ix { - previousSamples += d.chunk.NumSamples() + previousSamples += int(d.numSamples) } } + // mmappedChunks does not contain the last chunk. Hence check it separately. + if len(s.mmappedChunks) < ix { + previousSamples += s.headChunk.chunk.NumSamples() + } else { + totalSamples += s.headChunk.chunk.NumSamples() + } // Removing the extra transactionIDs that are relevant for samples that // come after this chunk, from the total transactionIDs. @@ -1943,7 +2155,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Itera return chunkenc.NewNopIterator() } - if id-s.firstChunkID < len(s.chunks)-1 { + if id-s.firstChunkID < len(s.mmappedChunks) { if stopAfter == numSamples { return c.chunk.Iterator(it) } @@ -1989,7 +2201,7 @@ type memChunk struct { minTime, maxTime int64 } -// Returns true if the chunk overlaps [mint, maxt]. +// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt]. func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { return mc.minTime <= maxt && mint <= mc.maxTime } @@ -2052,3 +2264,14 @@ func (ss stringset) slice() []string { sort.Strings(slice) return slice } + +type mmappedChunk struct { + ref uint64 + numSamples uint16 + minTime, maxTime int64 +} + +// Returns true if the chunk overlaps [mint, maxt]. +func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { + return mc.minTime <= maxt && mint <= mc.maxTime +} diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index c1ada34171..bd0f248267 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -14,6 +14,8 @@ package tsdb import ( + "io/ioutil" + "os" "strconv" "sync/atomic" "testing" @@ -23,8 +25,13 @@ import ( ) func BenchmarkHeadStripeSeriesCreate(b *testing.B) { + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() @@ -34,8 +41,13 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { } func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index ed4a4e51dc..7ac9b599b4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -19,7 +19,6 @@ import ( "math" "math/rand" "os" - "path" "path/filepath" "sort" "strconv" @@ -42,10 +41,11 @@ import ( func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - - h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize) - testutil.Ok(b, err) - defer h.Close() + h, _, closer := newTestHead(b, 10000, false) + defer closer() + defer func() { + testutil.Ok(b, h.Close()) + }() b.ReportAllocs() b.ResetTimer() @@ -173,7 +173,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Load the WAL. for i := 0; i < b.N; i++ { - h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) + h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize) testutil.Ok(b, err) h.Init(0) } @@ -209,20 +209,15 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, }, } - dir, err := ioutil.TempDir("", "test_read_wal") - testutil.Ok(t, err) + + head, w, closer := newTestHead(t, 1000, compress) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, head.Close()) }() - w, err := wal.New(nil, nil, dir, compress) - testutil.Ok(t, err) - defer w.Close() populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) - testutil.Ok(t, err) - testutil.Ok(t, head.Init(math.MinInt64)) testutil.Equals(t, uint64(101), head.lastSeriesID) @@ -244,72 +239,78 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, c.Err()) return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, nil))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, nil))) - testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, nil))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) }) } } func TestHead_WALMultiRef(t *testing.T) { - dir, err := ioutil.TempDir("", "test_wal_multi_ref") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - - head, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) - testutil.Ok(t, err) + head, w, closer := newTestHead(t, 1000, false) + defer closer() testutil.Ok(t, head.Init(0)) + app := head.Appender() ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) - testutil.Ok(t, head.Truncate(200)) - + // Add another sample outside chunk range to mmap a chunk. app = head.Appender() - ref2, err := app.Add(labels.FromStrings("foo", "bar"), 300, 2) + _, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) - if ref1 == ref2 { - t.Fatal("Refs are the same") - } + testutil.Ok(t, head.Truncate(1600)) + + app = head.Appender() + ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) + + // Add another sample outside chunk range to mmap a chunk. + app = head.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 4.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) + + testutil.Assert(t, ref1 != ref2, "Refs are the same") testutil.Ok(t, head.Close()) - w, err = wal.New(nil, nil, dir, false) + w, err = wal.New(nil, nil, w.Dir(), false) testutil.Ok(t, err) - head, err = NewHead(nil, nil, w, 10000, DefaultStripeSize) + head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize) testutil.Ok(t, err) testutil.Ok(t, head.Init(0)) - defer head.Close() + defer func() { + testutil.Ok(t, head.Close()) + }() - q, err := NewBlockQuerier(head, 0, 300) + q, err := NewBlockQuerier(head, 0, 2100) testutil.Ok(t, err) series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) - testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{100, 1}, sample{300, 2}}}, series) + testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: { + sample{100, 1}, + sample{1500, 2}, + sample{1700, 3}, + sample{2000, 4}, + }}, series) } func TestHead_Truncate(t *testing.T) { - dir, err := ioutil.TempDir("", "test_truncate") - testutil.Ok(t, err) + h, _, closer := newTestHead(t, 1000, false) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, h.Close()) }() - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - - h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() - h.initTime(0) s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) @@ -317,35 +318,35 @@ func TestHead_Truncate(t *testing.T) { s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) - s1.chunks = []*memChunk{ - {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, - {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + s1.mmappedChunks = []*mmappedChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, + {minTime: 2000, maxTime: 2999}, } - s2.chunks = []*memChunk{ - {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, - {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, + s2.mmappedChunks = []*mmappedChunk{ + {minTime: 1000, maxTime: 1999}, + {minTime: 2000, maxTime: 2999}, + {minTime: 3000, maxTime: 3999}, } - s3.chunks = []*memChunk{ - {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, - {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + s3.mmappedChunks = []*mmappedChunk{ + {minTime: 0, maxTime: 999}, + {minTime: 1000, maxTime: 1999}, } - s4.chunks = []*memChunk{} + s4.mmappedChunks = []*mmappedChunk{} // Truncation need not be aligned. testutil.Ok(t, h.Truncate(1)) testutil.Ok(t, h.Truncate(2000)) - testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, - }, h.series.getByID(s1.ref).chunks) + testutil.Equals(t, []*mmappedChunk{ + {minTime: 2000, maxTime: 2999}, + }, h.series.getByID(s1.ref).mmappedChunks) - testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, - {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, - }, h.series.getByID(s2.ref).chunks) + testutil.Equals(t, []*mmappedChunk{ + {minTime: 2000, maxTime: 2999}, + {minTime: 3000, maxTime: 3999}, + }, h.series.getByID(s2.ref).mmappedChunks) testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") testutil.Assert(t, h.series.getByID(s4.ref) == nil, "") @@ -382,36 +383,57 @@ func TestHead_Truncate(t *testing.T) { // Validate various behaviors brought on by firstChunkID accounting for // garbage collected chunks. func TestMemSeries_truncateChunks(t *testing.T) { - s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) + dir, err := ioutil.TempDir("", "truncate_chunks") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, chunkDiskMapper.Close()) + }() + + memChunkPool := sync.Pool{ + New: func() interface{} { + return &memChunk{} + }, + } + + s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0) + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) testutil.Assert(t, ok == true, "sample append failed") } // Check that truncate removes half of the chunks and afterwards // that the ID of the last chunk still gives us the same chunk afterwards. - countBefore := len(s.chunks) + countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. lastID := s.chunkID(countBefore - 1) - lastChunk := s.chunk(lastID) + lastChunk, _ := s.chunk(lastID, chunkDiskMapper) - testutil.Assert(t, s.chunk(0) != nil, "") + chk, _ := s.chunk(0, chunkDiskMapper) + testutil.Assert(t, chk != nil, "") testutil.Assert(t, lastChunk != nil, "") s.truncateChunksBefore(2000) - testutil.Equals(t, int64(2000), s.chunks[0].minTime) - testutil.Assert(t, s.chunk(0) == nil, "first chunks not gone") - testutil.Equals(t, countBefore/2, len(s.chunks)) - testutil.Equals(t, lastChunk, s.chunk(lastID)) + testutil.Equals(t, int64(2000), s.mmappedChunks[0].minTime) + chk, _ = s.chunk(0, chunkDiskMapper) + testutil.Assert(t, chk == nil, "first chunks not gone") + testutil.Equals(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. + chk, _ = s.chunk(lastID, chunkDiskMapper) + testutil.Equals(t, lastChunk, chk) // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil, nil) + it1 := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") - it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil, nil) + it2 := s.iterator(s.chunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil) _, ok = it2.(*memSafeIterator) testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } @@ -432,20 +454,14 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - dir, err := ioutil.TempDir("", "test_delete_series") - testutil.Ok(t, err) + head, w, closer := newTestHead(t, 1000, compress) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, head.Close()) }() - w, err := wal.New(nil, nil, dir, compress) - testutil.Ok(t, err) - defer w.Close() populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) - testutil.Ok(t, err) - testutil.Ok(t, head.Init(math.MinInt64)) testutil.Ok(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))) @@ -506,23 +522,12 @@ func TestHeadDeleteSimple(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { for _, c := range cases { - dir, err := ioutil.TempDir("", "test_wal_reload") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress) - testutil.Ok(t, err) - defer w.Close() - - head, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer head.Close() + head, w, closer := newTestHead(t, 1000, compress) + defer closer() app := head.Appender() for _, smpl := range smplsAll { - _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) } @@ -536,7 +541,7 @@ func TestHeadDeleteSimple(t *testing.T) { // Add more samples. app = head.Appender() for _, smpl := range c.addSamples { - _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) } @@ -545,10 +550,8 @@ func TestHeadDeleteSimple(t *testing.T) { // Compare the samples for both heads - before and after the reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. testutil.Ok(t, err) - defer reloadedW.Close() - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, DefaultStripeSize) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize) testutil.Ok(t, err) - defer reloadedHead.Close() testutil.Ok(t, reloadedHead.Init(0)) // Compare the query results for both heads - before and after the reload. @@ -559,6 +562,7 @@ func TestHeadDeleteSimple(t *testing.T) { actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) + testutil.Ok(t, q.Close()) expSeriesSet := newMockSeriesSet([]storage.Series{ newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample { ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) @@ -596,10 +600,13 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { + hb, _, closer := newTestHead(t, 1000000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() + numSamples := int64(10) - hb, err := NewHead(nil, nil, nil, 1000000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() app := hb.Appender() smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { @@ -640,19 +647,12 @@ func TestDeleteUntilCurMax(t *testing.T) { } func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { - dir, err := ioutil.TempDir("", "test_delete_wal") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - wlog, err := wal.NewSize(nil, nil, dir, 32768, false) - testutil.Ok(t, err) + numSamples := 10000 // Enough samples to cause a checkpoint. - numSamples := 10000 - hb, err := NewHead(nil, nil, wlog, int64(numSamples)*10, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, w, closer := newTestHead(t, int64(numSamples)*10, false) + defer closer() + for i := 0; i < numSamples; i++ { app := hb.Appender() _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) @@ -664,11 +664,11 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { testutil.Ok(t, hb.Close()) // Confirm there's been a checkpoint. - cdir, _, err := wal.LastCheckpoint(dir) + cdir, _, err := wal.LastCheckpoint(w.Dir()) testutil.Ok(t, err) // Read in checkpoint and WAL. recs := readTestWAL(t, cdir) - recs = append(recs, readTestWAL(t, dir)...) + recs = append(recs, readTestWAL(t, w.Dir())...) var series, samples, stones int for _, rec := range recs { @@ -740,13 +740,13 @@ func TestDelete_e2e(t *testing.T) { for _, l := range lbls { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - dir, _ := ioutil.TempDir("", "test") + + hb, _, closer := newTestHead(t, 100000, false) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, hb.Close()) }() - hb, err := NewHead(nil, nil, nil, 100000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + app := hb.Appender() for _, l := range lbls { ls := labels.New(l...) @@ -926,68 +926,85 @@ func TestComputeChunkEndTime(t *testing.T) { } func TestMemSeries_append(t *testing.T) { - s := newMemSeries(labels.Labels{}, 1, 500) + dir, err := ioutil.TempDir("", "append") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, chunkDiskMapper.Close()) + }() + + s := newMemSeries(labels.Labels{}, 1, 500, nil) // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0) + ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0) + ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3, 0) + ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0) + ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - testutil.Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range") - testutil.Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range") + testutil.Assert(t, len(s.mmappedChunks) == 1, "there should be only 1 mmapped chunk") + testutil.Assert(t, s.mmappedChunks[0].minTime == 998 && s.mmappedChunks[0].maxTime == 999, "wrong chunk range") + testutil.Assert(t, s.headChunk.minTime == 1000 && s.headChunk.maxTime == 1001, "wrong chunk range") // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0) + ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) testutil.Assert(t, ok, "append failed") } - testutil.Assert(t, len(s.chunks) > 7, "expected intermediate chunks") + testutil.Assert(t, len(s.mmappedChunks)+1 > 7, "expected intermediate chunks") // All chunks but the first and last should now be moderately full. - for i, c := range s.chunks[1 : len(s.chunks)-1] { - testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) + for i, c := range s.mmappedChunks[1:] { + chk, err := chunkDiskMapper.Chunk(c.ref) + testutil.Ok(t, err) + testutil.Assert(t, chk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, chk.NumSamples()) } } func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") @@ -1004,7 +1021,7 @@ func TestGCChunkAccess(t *testing.T) { testutil.Equals(t, 2, len(chunks)) cr := h.chunksRange(0, 1500, nil) - _, err = cr.Chunk(chunks[0].Ref) + _, err := cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) @@ -1019,27 +1036,29 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") @@ -1056,7 +1075,7 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, 2, len(chunks)) cr := h.chunksRange(0, 2000, nil) - _, err = cr.Chunk(chunks[0].Ref) + _, err := cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) @@ -1072,15 +1091,17 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) app := h.appender() lset := labels.FromStrings("a", "1") - _, err = app.Add(lset, 2100, 1) + _, err := app.Add(lset, 2100, 1) testutil.Ok(t, err) testutil.Ok(t, h.Truncate(2000)) @@ -1100,15 +1121,17 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) app := h.appender() lset := labels.FromStrings("a", "1") - _, err = app.Add(lset, 2100, 1) + _, err := app.Add(lset, 2100, 1) testutil.Ok(t, err) testutil.Ok(t, h.Truncate(2000)) @@ -1134,20 +1157,14 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestHead_LogRollback(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_rollback") - testutil.Ok(t, err) + h, w, closer := newTestHead(t, 1000, compress) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, h.Close()) }() - w, err := wal.New(nil, nil, dir, compress) - testutil.Ok(t, err) - defer w.Close() - h, err := NewHead(nil, nil, w, 1000, DefaultStripeSize) - testutil.Ok(t, err) - app := h.Appender() - _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) + _, err := app.Add(labels.FromStrings("a", "b"), 1, 2) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) @@ -1231,7 +1248,7 @@ func TestWalRepair_DecodingError(t *testing.T) { testutil.Ok(t, w.Log(test.rec)) } - h, err := NewHead(nil, nil, w, 1, DefaultStripeSize) + h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, DefaultStripeSize) testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1271,18 +1288,78 @@ func TestWalRepair_DecodingError(t *testing.T) { } } -func TestNewWalSegmentOnTruncate(t *testing.T) { - dir, err := ioutil.TempDir("", "test_wal_segments") +func TestHeadReadWriterRepair(t *testing.T) { + dir, err := ioutil.TempDir("", "head_read_writer_repair") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768, false) - testutil.Ok(t, err) - h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + const chunkRange = chunks.DefaultHeadChunkFileMaxTimeRange // to hold 4 chunks per segment. + + walDir := filepath.Join(dir, "wal") + // Fill the chunk segments and corrupt it. + { + w, err := wal.New(nil, nil, walDir, false) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, w, chunkRange, dir, nil, DefaultStripeSize) + testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) + testutil.Ok(t, h.Init(math.MinInt64)) + + s, created := h.getOrCreate(1, labels.FromStrings("a", "1")) + testutil.Assert(t, created, "series was not created") + + for i := 0; i < 7; i++ { + ok, chunkCreated := s.append(int64(i*int(chunkRange)), float64(i*int(chunkRange)), 0, h.chunkDiskMapper) + testutil.Assert(t, ok, "series append failed") + testutil.Assert(t, chunkCreated, "chunk was not created") + ok, chunkCreated = s.append(int64(i*int(chunkRange))+chunkRange-1, float64(i*int(chunkRange)), 0, h.chunkDiskMapper) + testutil.Assert(t, ok, "series append failed") + testutil.Assert(t, !chunkCreated, "chunk was created") + } + testutil.Ok(t, h.Close()) + + // Verify that there are 6 segment files. + files, err := ioutil.ReadDir(mmappedChunksDir(dir)) + testutil.Ok(t, err) + testutil.Equals(t, 6, len(files)) + + // Corrupt the 4th file by writing a random byte to series ref. + f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666) + testutil.Ok(t, err) + n, err := f.WriteAt([]byte{67, 88}, chunks.HeadChunkFileHeaderSize+2) + testutil.Ok(t, err) + testutil.Equals(t, 2, n) + testutil.Ok(t, f.Close()) + } + + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.mmapChunkCorruptionTotal)) + } + + // Verify that there are 3 segment files after the repair. + // The segments from the corrupt segment should be removed. + { + files, err := ioutil.ReadDir(mmappedChunksDir(dir)) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(files)) + } +} + +func TestNewWalSegmentOnTruncate(t *testing.T) { + h, wlog, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() add := func(ts int64) { app := h.Appender() _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0) @@ -1309,21 +1386,15 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - dir, err := ioutil.TempDir("", "test_duplicate_label_name") - testutil.Ok(t, err) + h, _, closer := newTestHead(t, 1000, false) + defer closer() defer func() { - testutil.Ok(t, os.RemoveAll(dir)) + testutil.Ok(t, h.Close()) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768, false) - testutil.Ok(t, err) - - h, err := NewHead(nil, nil, wlog, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() add := func(labels labels.Labels, labelName string) { app := h.Appender() - _, err = app.Add(labels, 0, 0) + _, err := app.Add(labels, 0, 0) testutil.NotOk(t, err) testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) } @@ -1335,22 +1406,20 @@ func TestAddDuplicateLabelName(t *testing.T) { func TestMemSeriesIsolation(t *testing.T) { // Put a series, select it. GC it and then access it. - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() - lastValue := func(maxAppendID uint64) int { - idx, err := hb.Index() + lastValue := func(h *Head, maxAppendID uint64) int { + idx, err := h.Index() + testutil.Ok(t, err) - iso := hb.iso.State() + iso := h.iso.State() iso.maxAppendID = maxAppendID querier := &blockQuerier{ mint: 0, maxt: 10000, index: idx, - chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso), + chunks: h.chunksRange(math.MinInt64, math.MaxInt64, iso), tombstones: tombstones.NewMemTombstones(), } @@ -1368,50 +1437,61 @@ func TestMemSeriesIsolation(t *testing.T) { return -1 } - i := 1 - for ; i <= 1000; i++ { - var app storage.Appender - // To initialize bounds. - if hb.MinTime() == math.MaxInt64 { - app = &initAppender{head: hb} - } else { - a := hb.appender() - a.cleanupAppendIDsBelow = 0 - app = a - } + addSamples := func(h *Head) int { + i := 1 + for ; i <= 1000; i++ { + var app storage.Appender + // To initialize bounds. + if h.MinTime() == math.MaxInt64 { + app = &initAppender{head: h} + } else { + a := h.appender() + a.cleanupAppendIDsBelow = 0 + app = a + } - _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + return i } + testIsolation := func(h *Head, i int) { + } + + // Test isolation without restart of Head. + hb, _, closer := newTestHead(t, 1000, false) + i := addSamples(hb) + testIsolation(hb, i) + // Test simple cases in different chunks when no appendID cleanup has been performed. - testutil.Equals(t, 10, lastValue(10)) - testutil.Equals(t, 130, lastValue(130)) - testutil.Equals(t, 160, lastValue(160)) - testutil.Equals(t, 240, lastValue(240)) - testutil.Equals(t, 500, lastValue(500)) - testutil.Equals(t, 750, lastValue(750)) - testutil.Equals(t, 995, lastValue(995)) - testutil.Equals(t, 999, lastValue(999)) + testutil.Equals(t, 10, lastValue(hb, 10)) + testutil.Equals(t, 130, lastValue(hb, 130)) + testutil.Equals(t, 160, lastValue(hb, 160)) + testutil.Equals(t, 240, lastValue(hb, 240)) + testutil.Equals(t, 500, lastValue(hb, 500)) + testutil.Equals(t, 750, lastValue(hb, 750)) + testutil.Equals(t, 995, lastValue(hb, 995)) + testutil.Equals(t, 999, lastValue(hb, 999)) // Cleanup appendIDs below 500. app := hb.appender() app.cleanupAppendIDsBelow = 500 - _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) i++ // We should not get queries with a maxAppendID below 500 after the cleanup, // but they only take the remaining appendIDs into account. - testutil.Equals(t, 499, lastValue(10)) - testutil.Equals(t, 499, lastValue(130)) - testutil.Equals(t, 499, lastValue(160)) - testutil.Equals(t, 499, lastValue(240)) - testutil.Equals(t, 500, lastValue(500)) - testutil.Equals(t, 995, lastValue(995)) - testutil.Equals(t, 999, lastValue(999)) + testutil.Equals(t, 499, lastValue(hb, 10)) + testutil.Equals(t, 499, lastValue(hb, 130)) + testutil.Equals(t, 499, lastValue(hb, 160)) + testutil.Equals(t, 499, lastValue(hb, 240)) + testutil.Equals(t, 500, lastValue(hb, 500)) + testutil.Equals(t, 995, lastValue(hb, 995)) + testutil.Equals(t, 999, lastValue(hb, 999)) // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. @@ -1420,12 +1500,12 @@ func TestMemSeriesIsolation(t *testing.T) { _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - testutil.Equals(t, 999, lastValue(998)) - testutil.Equals(t, 999, lastValue(999)) - testutil.Equals(t, 1000, lastValue(1000)) - testutil.Equals(t, 1001, lastValue(1001)) - testutil.Equals(t, 1002, lastValue(1002)) - testutil.Equals(t, 1002, lastValue(1003)) + testutil.Equals(t, 999, lastValue(hb, 998)) + testutil.Equals(t, 999, lastValue(hb, 999)) + testutil.Equals(t, 1000, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1002, lastValue(hb, 1002)) + testutil.Equals(t, 1002, lastValue(hb, 1003)) i++ // Cleanup appendIDs below 1001, but with a rollback. @@ -1434,21 +1514,71 @@ func TestMemSeriesIsolation(t *testing.T) { _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) - testutil.Equals(t, 1000, lastValue(999)) - testutil.Equals(t, 1000, lastValue(1000)) - testutil.Equals(t, 1001, lastValue(1001)) - testutil.Equals(t, 1002, lastValue(1002)) - testutil.Equals(t, 1002, lastValue(1003)) + testutil.Equals(t, 1000, lastValue(hb, 999)) + testutil.Equals(t, 1000, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1002, lastValue(hb, 1002)) + testutil.Equals(t, 1002, lastValue(hb, 1003)) + + testutil.Ok(t, hb.Close()) + closer() + + // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. + hb, w, closer := newTestHead(t, 1000, false) + defer closer() + i = addSamples(hb) + testutil.Ok(t, hb.Close()) + + wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) + testutil.Ok(t, err) + hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, DefaultStripeSize) + defer func() { testutil.Ok(t, hb.Close()) }() + testutil.Ok(t, err) + testutil.Ok(t, hb.Init(0)) + + // No appends after restarting. Hence all should return the last value. + testutil.Equals(t, 1000, lastValue(hb, 10)) + testutil.Equals(t, 1000, lastValue(hb, 130)) + testutil.Equals(t, 1000, lastValue(hb, 160)) + testutil.Equals(t, 1000, lastValue(hb, 240)) + testutil.Equals(t, 1000, lastValue(hb, 500)) + + // Cleanup appendIDs below 1000, which means the sample buffer is + // the only thing with appendIDs. + app = hb.appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + i++ + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 1001, lastValue(hb, 998)) + testutil.Equals(t, 1001, lastValue(hb, 999)) + testutil.Equals(t, 1001, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1001, lastValue(hb, 1002)) + testutil.Equals(t, 1001, lastValue(hb, 1003)) + + // Cleanup appendIDs below 1002, but with a rollback. + app = hb.appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Rollback()) + testutil.Equals(t, 1001, lastValue(hb, 999)) + testutil.Equals(t, 1001, lastValue(hb, 1000)) + testutil.Equals(t, 1001, lastValue(hb, 1001)) + testutil.Equals(t, 1001, lastValue(hb, 1002)) + testutil.Equals(t, 1001, lastValue(hb, 1003)) } func TestIsolationRollback(t *testing.T) { // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() app := hb.Appender() - _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) @@ -1469,12 +1599,14 @@ func TestIsolationRollback(t *testing.T) { } func TestIsolationLowWatermarkMonotonous(t *testing.T) { - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() app1 := hb.Appender() - _, err = app1.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0) testutil.Ok(t, err) testutil.Ok(t, app1.Commit()) testutil.Equals(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") @@ -1501,15 +1633,17 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { } func TestIsolationAppendIDZeroIsNoop(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer h.Close() + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) - ok, _ := s.append(0, 0, 0) + ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) testutil.Assert(t, ok, "Series append failed.") testutil.Equals(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } @@ -1521,25 +1655,110 @@ func TestHeadSeriesChunkRace(t *testing.T) { } func TestIsolationWithoutAdd(t *testing.T) { - hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) - testutil.Ok(t, err) - defer hb.Close() + hb, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, hb.Close()) + }() app := hb.Appender() testutil.Ok(t, app.Commit()) app = hb.Appender() - _, err = app.Add(labels.FromStrings("foo", "baz"), 1, 1) + _, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") } -func testHeadSeriesChunkRace(t *testing.T) { - h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) +func TestOutOfOrderSamplesMetric(t *testing.T) { + dir, err := ioutil.TempDir("", "test") testutil.Ok(t, err) - defer h.Close() + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + db, err := Open(dir, nil, nil, DefaultOptions()) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + db.DisableCompactions() + + app := db.Appender() + for i := 1; i <= 5; i++ { + _, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + // Test out of order metric. + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 2, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), 3, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), 4, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + testutil.Ok(t, app.Commit()) + + // Compact Head to test out of bound metric. + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Equals(t, int64(math.MinInt64), db.head.minValidTime) + testutil.Ok(t, db.Compact()) + testutil.Assert(t, db.head.minValidTime > 0, "") + + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-2, 99) + testutil.Equals(t, storage.ErrOutOfBounds, err) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime-1, 99) + testutil.Equals(t, storage.ErrOutOfBounds, err) + testutil.Equals(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) + testutil.Ok(t, app.Commit()) + + // Some more valid samples for out of order. + app = db.Appender() + for i := 1; i <= 5; i++ { + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+int64(i), 99) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + // Test out of order metric. + app = db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+2, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+3, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + + _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime+DefaultBlockDuration+4, 99) + testutil.Equals(t, storage.ErrOutOfOrderSample, err) + testutil.Equals(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) + testutil.Ok(t, app.Commit()) +} + +func testHeadSeriesChunkRace(t *testing.T) { + h, _, closer := newTestHead(t, 1000, false) + defer closer() + defer func() { + testutil.Ok(t, h.Close()) + }() testutil.Ok(t, h.Init(0)) app := h.Appender() @@ -1568,3 +1787,19 @@ func testHeadSeriesChunkRace(t *testing.T) { testutil.Ok(t, ss.Err()) wg.Wait() } + +func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL, func()) { + dir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize) + testutil.Ok(t, err) + + testutil.Ok(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + + return h, wlog, func() { + testutil.Ok(t, os.RemoveAll(dir)) + } +} diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 3d0a3fbcf6..2a03852f20 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -30,7 +30,12 @@ const ( ) func BenchmarkPostingsForMatchers(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer func() { testutil.Ok(b, h.Close()) @@ -126,7 +131,12 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) { } func BenchmarkQuerierSelect(b *testing.B) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(b, err) defer h.Close() app := h.Appender() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index d6f8d10aaa..8cf13249aa 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1857,7 +1857,12 @@ func TestFindSetMatches(t *testing.T) { } func TestPostingsForMatchers(t *testing.T) { - h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(chunkDir)) + }() + h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize) testutil.Ok(t, err) defer func() { testutil.Ok(t, h.Close()) @@ -2220,7 +2225,12 @@ func BenchmarkQueries(b *testing.B) { queryTypes["_3-Blocks"] = &querier{blocks: qs[0:3]} queryTypes["_10-Blocks"] = &querier{blocks: qs} - head := createHead(b, series) + chunkDir, err := ioutil.TempDir("", "chunk_dir") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(chunkDir)) + }() + head := createHead(b, series, chunkDir) qHead, err := NewBlockQuerier(head, 1, int64(nSamples)) testutil.Ok(b, err) queryTypes["_Head"] = qHead @@ -2232,6 +2242,7 @@ func BenchmarkQueries(b *testing.B) { benchQuery(b, expExpansions, querier, selectors) }) } + testutil.Ok(b, head.Close()) } } } diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index 5246edd64f..8a26e91e79 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -32,8 +32,9 @@ type MetricSample struct { } // CreateHead creates a TSDB writer head to write the sample data to. -func CreateHead(samples []*MetricSample, chunkRange int64, logger log.Logger) (*Head, error) { - head, err := NewHead(nil, logger, nil, chunkRange, DefaultStripeSize) +func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logger log.Logger) (*Head, error) { + head, err := NewHead(nil, logger, nil, chunkRange, chunkDir, nil, DefaultStripeSize) + if err != nil { return nil, err } @@ -60,10 +61,15 @@ func CreateBlock(samples []*MetricSample, dir string, mint, maxt int64, logger l if chunkRange < 0 { return "", ErrInvalidTimes } - head, err := CreateHead(samples, chunkRange, logger) + chunkDir := filepath.Join(dir, "chunks_tmp") + defer func() { + os.RemoveAll(chunkDir) + }() + head, err := CreateHead(samples, chunkRange, chunkDir, logger) if err != nil { return "", err } + defer head.Close() compactor, err := NewLeveledCompactor(context.Background(), nil, logger, ExponentialBlockRanges(DefaultBlockDuration, 3, 5), nil) if err != nil { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index e0c7c74f9f..d693feb4e9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1898,8 +1898,18 @@ type fakeDB struct { func (f *fakeDB) CleanTombstones() error { return f.err } func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err } func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err } -func (f *fakeDB) Stats(statsByLabelName string) (*tsdb.Stats, error) { - h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize) +func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) { + dbDir, err := ioutil.TempDir("", "tsdb-api-ready") + if err != nil { + return nil, err + } + defer func() { + err := os.RemoveAll(dbDir) + if retErr != nil { + retErr = err + } + }() + h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize) return h.Stats(statsByLabelName), nil } diff --git a/web/web_test.go b/web/web_test.go index 23fbf3e490..96c4c5c4f5 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -100,6 +100,7 @@ func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) { func TestReadyAndHealthy(t *testing.T) { t.Parallel() + dbDir, err := ioutil.TempDir("", "tsdb-ready") testutil.Ok(t, err) defer testutil.Ok(t, os.RemoveAll(dbDir)) @@ -426,7 +427,6 @@ func TestDebugHandler(t *testing.T) { func TestHTTPMetrics(t *testing.T) { t.Parallel() - handler := New(nil, &Options{ RoutePrefix: "/", ListenAddress: "somehost:9090",