M-map full chunks of Head from disk (#6679)

When appending to the head and a chunk is full it is flushed to the disk and m-mapped (memory mapped) to free up memory

Prom startup now happens in these stages
 - Iterate the m-maped chunks from disk and keep a map of series reference to its slice of mmapped chunks.
- Iterate the WAL as usual. Whenever we create a new series, look for it's mmapped chunks in the map created before and add it to that series.

If a head chunk is corrupted the currpted one and all chunks after that are deleted and the data after the corruption is recovered from the existing WAL which means that a corruption in m-mapped files results in NO data loss.

[Mmaped chunks format](https://github.com/prometheus/prometheus/blob/master/tsdb/docs/format/head_chunks.md)  - main difference is that the chunk for mmaping now also includes series reference because there is no index for mapping series to chunks.
[The block chunks](https://github.com/prometheus/prometheus/blob/master/tsdb/docs/format/chunks.md) are accessed from the index which includes the offsets for the chunks in the chunks file - example - chunks of series ID have offsets 200, 500 etc in the chunk files.
In case of mmaped chunks, the offsets are stored in memory and accessed from that. During WAL replay, these offsets are restored by iterating all m-mapped chunks as stated above by matching the series id present in the chunk header and offset of that chunk in that file.

**Prombench results**

_WAL Replay_

1h Wal reply time
30% less wal reply time - 4m31 vs 3m36
2h Wal reply time
20% less wal reply time - 8m16 vs 7m

_Memory During WAL Replay_

High Churn:
10-15% less RAM -  32gb vs 28gb
20% less RAM after compaction 34gb vs 27gb
No Churn:
20-30% less RAM -  23gb vs 18gb
40% less RAM after compaction 32.5gb vs 20gb

Screenshots are in [this comment](https://github.com/prometheus/prometheus/pull/6679#issuecomment-621678932)


Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2020-05-06 21:00:00 +05:30 committed by GitHub
parent 532f7bbac9
commit d4b9fe801f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 985 additions and 428 deletions

View file

@ -246,7 +246,19 @@ func (p *queryLogTest) run(t *testing.T) {
p.setQueryLog(t, "") p.setQueryLog(t, "")
} }
params := append([]string{"-test.main", "--config.file=" + p.configFile.Name(), "--web.enable-lifecycle", fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port)}, p.params()...) dir, err := ioutil.TempDir("", "query_log_test")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
params := append([]string{
"-test.main",
"--config.file=" + p.configFile.Name(),
"--web.enable-lifecycle",
fmt.Sprintf("--web.listen-address=%s:%d", p.host, p.port),
"--storage.tsdb.path=" + dir,
}, p.params()...)
prom := exec.Command(promPath, params...) prom := exec.Command(promPath, params...)

View file

@ -305,7 +305,12 @@ func TestReadIndexFormatV1(t *testing.T) {
// createBlock creates a block with given set of series and returns its dir. // createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []storage.Series) string { func createBlock(tb testing.TB, dir string, series []storage.Series) string {
return createBlockFromHead(tb, dir, createHead(tb, series)) chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, os.RemoveAll(chunkDir)) }()
head := createHead(tb, series, chunkDir)
defer func() { testutil.Ok(tb, head.Close()) }()
return createBlockFromHead(tb, dir, head)
} }
func createBlockFromHead(tb testing.TB, dir string, head *Head) string { func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
@ -321,10 +326,9 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
return filepath.Join(dir, ulid.String()) return filepath.Join(dir, ulid.String())
} }
func createHead(tb testing.TB, series []storage.Series) *Head { func createHead(tb testing.TB, series []storage.Series, chunkDir string) *Head {
head, err := NewHead(nil, nil, nil, 2*60*60*1000, DefaultStripeSize) head, err := NewHead(nil, nil, nil, 2*60*60*1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(tb, err) testutil.Ok(tb, err)
defer head.Close()
app := head.Appender() app := head.Appender()
for _, s := range series { for _, s := range series {

View file

@ -72,13 +72,13 @@ const (
) )
// corruptionErr is an error that's returned when corruption is encountered. // corruptionErr is an error that's returned when corruption is encountered.
type corruptionErr struct { type CorruptionErr struct {
Dir string Dir string
FileIndex int FileIndex int
Err error Err error
} }
func (e *corruptionErr) Error() string { func (e *CorruptionErr) Error() string {
return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error() return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error()
} }
@ -512,7 +512,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) {
// and runs the provided function on each chunk. It returns on the first error encountered. // and runs the provided function on each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper // NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file. // to set the maxt of all the file.
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64) error) (err error) { func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error) (err error) {
cdm.writePathMtx.Lock() cdm.writePathMtx.Lock()
defer cdm.writePathMtx.Unlock() defer cdm.writePathMtx.Unlock()
@ -550,7 +550,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
if allZeros { if allZeros {
break break
} }
return &corruptionErr{ return &CorruptionErr{
Dir: cdm.dir.Name(), Dir: cdm.dir.Name(),
FileIndex: segID, FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
@ -577,12 +577,15 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
idx += ChunkEncodingSize // Skip encoding. idx += ChunkEncodingSize // Skip encoding.
dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
idx += n + int(dataLen) // Skip the data. idx += n
numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2))
idx += int(dataLen) // Skip the data.
// In the beginning we only checked for the chunk meta size. // In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again. // Now that we have added the chunk data length, we check for sufficient bytes again.
if idx+CRCSize > fileEnd { if idx+CRCSize > fileEnd {
return &corruptionErr{ return &CorruptionErr{
Dir: cdm.dir.Name(), Dir: cdm.dir.Name(),
FileIndex: segID, FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
@ -595,7 +598,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
return err return err
} }
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return &corruptionErr{ return &CorruptionErr{
Dir: cdm.dir.Name(), Dir: cdm.dir.Name(),
FileIndex: segID, FileIndex: segID,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
@ -607,14 +610,14 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64,
mmapFile.maxt = maxt mmapFile.maxt = maxt
} }
if err := f(seriesRef, chunkRef, mint, maxt); err != nil { if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil {
return err return err
} }
} }
if idx > fileEnd { if idx > fileEnd {
// It should be equal to the slice length. // It should be equal to the slice length.
return &corruptionErr{ return &CorruptionErr{
Dir: cdm.dir.Name(), Dir: cdm.dir.Name(),
FileIndex: segID, FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
@ -678,11 +681,11 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
return nil return nil
} }
// Repair deletes all the head chunk files after the one which had the corruption // DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file). // (including the corrupt file).
func (cdm *ChunkDiskMapper) Repair(originalErr error) error { func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
cerr, ok := err.(*corruptionErr) cerr, ok := err.(*CorruptionErr)
if !ok { if !ok {
return errors.Wrap(originalErr, "cannot handle error") return errors.Wrap(originalErr, "cannot handle error")
} }

View file

@ -39,6 +39,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
type expectedDataType struct { type expectedDataType struct {
seriesRef, chunkRef uint64 seriesRef, chunkRef uint64
mint, maxt int64 mint, maxt int64
numSamples uint16
chunk chunkenc.Chunk chunk chunkenc.Chunk
} }
expectedData := []expectedDataType{} expectedData := []expectedDataType{}
@ -51,11 +52,12 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw) seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
totalChunks++ totalChunks++
expectedData = append(expectedData, expectedDataType{ expectedData = append(expectedData, expectedDataType{
seriesRef: seriesRef, seriesRef: seriesRef,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
chunkRef: chkRef, chunkRef: chkRef,
chunk: chunk, chunk: chunk,
numSamples: uint16(chunk.NumSamples()),
}) })
if hrw.curFileSequence != 1 { if hrw.curFileSequence != 1 {
@ -128,7 +130,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
idx := 0 idx := 0
err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64) error { err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
t.Helper() t.Helper()
expData := expectedData[idx] expData := expectedData[idx]
@ -136,6 +138,7 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
testutil.Equals(t, expData.chunkRef, chunkRef) testutil.Equals(t, expData.chunkRef, chunkRef)
testutil.Equals(t, expData.maxt, maxt) testutil.Equals(t, expData.maxt, maxt)
testutil.Equals(t, expData.maxt, maxt) testutil.Equals(t, expData.maxt, maxt)
testutil.Equals(t, expData.numSamples, numSamples)
actChunk, err := hrw.Chunk(expData.chunkRef) actChunk, err := hrw.Chunk(expData.chunkRef)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -157,7 +160,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
}() }()
testutil.Assert(t, !hrw.fileMaxtSet, "") testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "") testutil.Assert(t, hrw.fileMaxtSet, "")
timeRange := 0 timeRange := 0
@ -227,7 +230,7 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, !hrw.fileMaxtSet, "") testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil })) testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "") testutil.Assert(t, hrw.fileMaxtSet, "")
// Truncating files after restart. // Truncating files after restart.

View file

@ -870,7 +870,12 @@ func BenchmarkCompactionFromHead(b *testing.B) {
for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
labelValues := totalSeries / labelNames labelValues := totalSeries / labelNames
b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(b, err) testutil.Ok(b, err)
for ln := 0; ln < labelNames; ln++ { for ln := 0; ln < labelNames; ln++ {
app := h.Appender() app := h.Appender()

View file

@ -309,7 +309,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
if err != nil { if err != nil {
return err return err
} }
head, err := NewHead(nil, db.logger, w, 1, DefaultStripeSize) head, err := NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize)
if err != nil { if err != nil {
return err return err
} }
@ -368,7 +368,7 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
blocks[i] = b blocks[i] = b
} }
head, err := NewHead(nil, db.logger, nil, 1, DefaultStripeSize) head, err := NewHead(nil, db.logger, nil, 1, db.dir, nil, DefaultStripeSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -379,11 +379,14 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
// Also add the WAL if the current blocks don't cover the requests time range. // Also add the WAL if the current blocks don't cover the requests time range.
if maxBlockTime <= maxt { if maxBlockTime <= maxt {
if err := head.Close(); err != nil {
return nil, err
}
w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal")) w, err := wal.Open(db.logger, filepath.Join(db.dir, "wal"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
head, err = NewHead(nil, db.logger, w, 1, DefaultStripeSize) head, err = NewHead(nil, db.logger, w, 1, db.dir, nil, DefaultStripeSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -395,10 +398,10 @@ func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Qu
// Set the wal to nil to disable all wal operations. // Set the wal to nil to disable all wal operations.
// This is mainly to avoid blocking when closing the head. // This is mainly to avoid blocking when closing the head.
head.wal = nil head.wal = nil
db.closers = append(db.closers, head)
} }
db.closers = append(db.closers, head)
// TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance. // TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance.
// Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite. // Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite.
// Option 2: refactor Querier to use another independent func which // Option 2: refactor Querier to use another independent func which
@ -583,19 +586,21 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
var wlog *wal.WAL var wlog *wal.WAL
segmentSize := wal.DefaultSegmentSize segmentSize := wal.DefaultSegmentSize
walDir := filepath.Join(dir, "wal")
// Wal is enabled. // Wal is enabled.
if opts.WALSegmentSize >= 0 { if opts.WALSegmentSize >= 0 {
// Wal is set to a custom size. // Wal is set to a custom size.
if opts.WALSegmentSize > 0 { if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize segmentSize = opts.WALSegmentSize
} }
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) wlog, err = wal.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
db.head, err = NewHead(r, l, wlog, rngs[0], opts.StripeSize) db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1018,9 +1023,10 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo
deletable = make(map[ulid.ULID]*Block) deletable = make(map[ulid.ULID]*Block)
walSize, _ := db.Head().wal.Size() walSize, _ := db.Head().wal.Size()
// Initializing size counter with WAL size, headChunksSize := db.Head().chunkDiskMapper.Size()
// as that is part of the retention strategy. // Initializing size counter with WAL size and Head chunks
blocksSize := walSize // written to disk, as that is part of the retention strategy.
blocksSize := walSize + headChunksSize
for i, block := range blocks { for i, block := range blocks {
blocksSize += block.Size() blocksSize += block.Size()
if blocksSize > int64(db.opts.MaxBytes) { if blocksSize > int64(db.opts.MaxBytes) {

View file

@ -849,8 +849,15 @@ func TestWALSegmentSizeOptions(t *testing.T) {
tests := map[int]func(dbdir string, segmentSize int){ tests := map[int]func(dbdir string, segmentSize int){
// Default Wal Size. // Default Wal Size.
0: func(dbDir string, segmentSize int) { 0: func(dbDir string, segmentSize int) {
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Ok(t, err) testutil.Ok(t, err)
files := []os.FileInfo{}
for _, f := range filesAndDir {
if !f.IsDir() {
files = append(files, f)
}
}
// All the full segment files (all but the last) should match the segment size option.
for _, f := range files[:len(files)-1] { for _, f := range files[:len(files)-1] {
testutil.Equals(t, int64(DefaultOptions().WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) testutil.Equals(t, int64(DefaultOptions().WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
} }
@ -859,9 +866,16 @@ func TestWALSegmentSizeOptions(t *testing.T) {
}, },
// Custom Wal Size. // Custom Wal Size.
2 * 32 * 1024: func(dbDir string, segmentSize int) { 2 * 32 * 1024: func(dbDir string, segmentSize int) {
files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) filesAndDir, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
testutil.Ok(t, err) testutil.Ok(t, err)
files := []os.FileInfo{}
for _, f := range filesAndDir {
if !f.IsDir() {
files = append(files, f)
}
}
testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
// All the full segment files (all but the last) should match the segment size option.
for _, f := range files[:len(files)-1] { for _, f := range files[:len(files)-1] {
testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
} }
@ -870,9 +884,12 @@ func TestWALSegmentSizeOptions(t *testing.T) {
}, },
// Wal disabled. // Wal disabled.
-1: func(dbDir string, segmentSize int) { -1: func(dbDir string, segmentSize int) {
if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) { // Check that WAL dir is not there.
t.Fatal("wal directory is present when the wal is disabled") _, err := os.Stat(filepath.Join(dbDir, "wal"))
} testutil.NotOk(t, err)
// Check that there is chunks dir.
_, err = os.Stat(mmappedChunksDir(dbDir))
testutil.Ok(t, err)
}, },
} }
for segmentSize, testFunc := range tests { for segmentSize, testFunc := range tests {

View file

@ -16,6 +16,7 @@ package tsdb
import ( import (
"fmt" "fmt"
"math" "math"
"path/filepath"
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
@ -32,6 +33,7 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tombstones"
@ -54,12 +56,13 @@ type Head struct {
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
lastSeriesID uint64 lastSeriesID uint64
metrics *headMetrics metrics *headMetrics
wal *wal.WAL wal *wal.WAL
logger log.Logger logger log.Logger
appendPool sync.Pool appendPool sync.Pool
seriesPool sync.Pool seriesPool sync.Pool
bytesPool sync.Pool bytesPool sync.Pool
memChunkPool sync.Pool
// All series addressable by their ID or hash. // All series addressable by their ID or hash.
series *stripeSeries series *stripeSeries
@ -80,27 +83,35 @@ type Head struct {
cardinalityMutex sync.Mutex cardinalityMutex sync.Mutex
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
// chunkDiskMapper is used to write and read Head chunks to/from disk.
chunkDiskMapper *chunks.ChunkDiskMapper
// chunkDirRoot is the parent directory of the chunks directory.
chunkDirRoot string
} }
type headMetrics struct { type headMetrics struct {
activeAppenders prometheus.Gauge activeAppenders prometheus.Gauge
series prometheus.GaugeFunc series prometheus.GaugeFunc
seriesCreated prometheus.Counter seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter seriesNotFound prometheus.Counter
chunks prometheus.Gauge chunks prometheus.Gauge
chunksCreated prometheus.Counter chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter chunksRemoved prometheus.Counter
gcDuration prometheus.Summary gcDuration prometheus.Summary
samplesAppended prometheus.Counter samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary outOfBoundSamples prometheus.Counter
walCorruptionsTotal prometheus.Counter outOfOrderSamples prometheus.Counter
headTruncateFail prometheus.Counter walTruncateDuration prometheus.Summary
headTruncateTotal prometheus.Counter walCorruptionsTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter headTruncateFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter headTruncateTotal prometheus.Counter
checkpointCreationFail prometheus.Counter checkpointDeleteFail prometheus.Counter
checkpointCreationTotal prometheus.Counter checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
mmapChunkCorruptionTotal prometheus.Counter
} }
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -155,6 +166,14 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_samples_appended_total", Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.", Help: "Total number of appended samples.",
}), }),
outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_bound_samples_total",
Help: "Total number of out of bound samples ingestion failed attempts.",
}),
outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_order_samples_total",
Help: "Total number of out of order samples ingestion failed attempts.",
}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total", Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.", Help: "Total number of head truncations that failed.",
@ -179,6 +198,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_checkpoint_creations_total", Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.", Help: "Total number of checkpoint creations attempted.",
}), }),
mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_mmap_chunk_corruptions_total",
Help: "Total number of memory-mapped chunk corruptions.",
}),
} }
if r != nil { if r != nil {
@ -195,12 +218,15 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.walTruncateDuration, m.walTruncateDuration,
m.walCorruptionsTotal, m.walCorruptionsTotal,
m.samplesAppended, m.samplesAppended,
m.outOfBoundSamples,
m.outOfOrderSamples,
m.headTruncateFail, m.headTruncateFail,
m.headTruncateTotal, m.headTruncateTotal,
m.checkpointDeleteFail, m.checkpointDeleteFail,
m.checkpointDeleteTotal, m.checkpointDeleteTotal,
m.checkpointCreationFail, m.checkpointCreationFail,
m.checkpointCreationTotal, m.checkpointCreationTotal,
m.mmapChunkCorruptionTotal,
// Metrics bound to functions and not needed in tests // Metrics bound to functions and not needed in tests
// can be created and registered on the spot. // can be created and registered on the spot.
prometheus.NewGaugeFunc(prometheus.GaugeOpts{ prometheus.NewGaugeFunc(prometheus.GaugeOpts{
@ -258,7 +284,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
// stripeSize sets the number of entries in the hash map, it must be a power of 2. // stripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series. // A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, stripeSize int) (*Head, error) { func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int) (*Head, error) {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
@ -278,12 +304,30 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
tombstones: tombstones.NewMemTombstones(), tombstones: tombstones.NewMemTombstones(),
iso: newIsolation(), iso: newIsolation(),
deleted: map[uint64]int{}, deleted: map[uint64]int{},
memChunkPool: sync.Pool{
New: func() interface{} {
return &memChunk{}
},
},
chunkDirRoot: chkDirRoot,
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
if pool == nil {
pool = chunkenc.NewPool()
}
var err error
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), pool)
if err != nil {
return nil, err
}
return h, nil return h, nil
} }
func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") }
// processWALSamples adds a partition of samples it receives to the head and passes // processWALSamples adds a partition of samples it receives to the head and passes
// them on to other workers. // them on to other workers.
// Samples before the mint timestamp are discarded. // Samples before the mint timestamp are discarded.
@ -312,7 +356,7 @@ func (h *Head) processWALSamples(
} }
refSeries[s.Ref] = ms refSeries[s.Ref] = ms
} }
if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated { if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated {
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
} }
@ -351,7 +395,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
} }
} }
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) {
// Track number of samples that referenced a series we don't know about // Track number of samples that referenced a series we don't know about
// for error reporting. // for error reporting.
var unknownRefs uint64 var unknownRefs uint64
@ -472,7 +516,20 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
for _, s := range v { for _, s := range v {
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if !created { if created {
// If this series gets a duplicate record, we don't restore its mmapped chunks,
// and instead restore everything from WAL records.
series.mmappedChunks = mmappedChunks[series.ref]
h.metrics.chunks.Add(float64(len(series.mmappedChunks)))
h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks)))
if len(series.mmappedChunks) > 0 {
h.updateMinMaxTime(series.minTime(), series.maxTime())
}
} else {
// TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID.
// There's already a different ref for this series. // There's already a different ref for this series.
multiRef[s.Ref] = series.ref multiRef[s.Ref] = series.ref
} }
@ -572,8 +629,20 @@ func (h *Head) Init(minValidTime int64) error {
return nil return nil
} }
level.Info(h.logger).Log("msg", "Replaying WAL, this may take awhile") level.Info(h.logger).Log("msg", "Replaying WAL and on-disk memory mappable chunks if any, this may take a while")
start := time.Now() start := time.Now()
mmappedChunks, err := h.loadMmappedChunks()
if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
h.metrics.mmapChunkCorruptionTotal.Inc()
}
// If this fails, data will be recovered from WAL.
// Hence we wont lose any data (given WAL is not corrupt).
h.removeCorruptedMmappedChunks(err)
}
// Backfill the checkpoint first if it exists. // Backfill the checkpoint first if it exists.
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
if err != nil && err != record.ErrNotFound { if err != nil && err != record.ErrNotFound {
@ -593,7 +662,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user // A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway. // intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil { if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil {
return errors.Wrap(err, "backfill checkpoint") return errors.Wrap(err, "backfill checkpoint")
} }
startFrom++ startFrom++
@ -614,7 +683,7 @@ func (h *Head) Init(minValidTime int64) error {
} }
sr := wal.NewSegmentBufReader(s) sr := wal.NewSegmentBufReader(s)
err = h.loadWAL(wal.NewReader(sr), multiRef) err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks)
if err := sr.Close(); err != nil { if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
} }
@ -629,6 +698,54 @@ func (h *Head) Init(minValidTime int64) error {
return nil return nil
} }
func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) {
mmappedChunks := map[uint64][]*mmappedChunk{}
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
if maxt < h.minValidTime {
return nil
}
slice := mmappedChunks[seriesRef]
if len(slice) > 0 {
if slice[len(slice)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
}
}
slice = append(slice, &mmappedChunk{
ref: chunkRef,
minTime: mint,
maxTime: maxt,
numSamples: numSamples,
})
mmappedChunks[seriesRef] = slice
return nil
}); err != nil {
return nil, errors.Wrap(err, "iterate on on-disk chunks")
}
return mmappedChunks, nil
}
// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
// loaded mmapped chunks.
func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk {
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err)
return map[uint64][]*mmappedChunk{}
}
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
mmappedChunks, err := h.loadMmappedChunks()
if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
mmappedChunks = map[uint64][]*mmappedChunk{}
}
return mmappedChunks
}
// Truncate removes old data before mint from the head. // Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) (err error) { func (h *Head) Truncate(mint int64) (err error) {
defer func() { defer func() {
@ -662,6 +779,11 @@ func (h *Head) Truncate(mint int64) (err error) {
level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start)) level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start))
h.metrics.gcDuration.Observe(time.Since(start).Seconds()) h.metrics.gcDuration.Observe(time.Since(start).Seconds())
// Truncate the chunk m-mapper.
if err := h.chunkDiskMapper.Truncate(mint); err != nil {
return errors.Wrap(err, "truncate chunks.HeadReadWriter")
}
if h.wal == nil { if h.wal == nil {
return nil return nil
} }
@ -947,6 +1069,7 @@ type headAppender struct {
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.minValidTime { if t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc()
return 0, storage.ErrOutOfBounds return 0, storage.ErrOutOfBounds
} }
@ -973,6 +1096,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if t < a.minValidTime { if t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc()
return storage.ErrOutOfBounds return storage.ErrOutOfBounds
} }
@ -983,6 +1107,9 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
s.Lock() s.Lock()
if err := s.appendable(t, v); err != nil { if err := s.appendable(t, v); err != nil {
s.Unlock() s.Unlock()
if err == storage.ErrOutOfOrderSample {
a.head.metrics.outOfOrderSamples.Inc()
}
return err return err
} }
s.pendingCommit = true s.pendingCommit = true
@ -1051,13 +1178,14 @@ func (a *headAppender) Commit() error {
for i, s := range a.samples { for i, s := range a.samples {
series = a.sampleSeries[i] series = a.sampleSeries[i]
series.Lock() series.Lock()
ok, chunkCreated := series.append(s.T, s.V, a.appendID) ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false series.pendingCommit = false
series.Unlock() series.Unlock()
if !ok { if !ok {
total-- total--
a.head.metrics.outOfOrderSamples.Inc()
} }
if chunkCreated { if chunkCreated {
a.head.metrics.chunks.Inc() a.head.metrics.chunks.Inc()
@ -1225,10 +1353,11 @@ func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReade
mint = hmin mint = hmin
} }
return &headChunkReader{ return &headChunkReader{
head: h, head: h,
mint: mint, mint: mint,
maxt: maxt, maxt: maxt,
isoState: is, isoState: is,
memChunkPool: &h.memChunkPool,
} }
} }
@ -1271,16 +1400,19 @@ func (h *Head) compactable() bool {
// Close flushes the WAL and closes the head. // Close flushes the WAL and closes the head.
func (h *Head) Close() error { func (h *Head) Close() error {
if h.wal == nil { var merr tsdb_errors.MultiError
return nil merr.Add(h.chunkDiskMapper.Close())
if h.wal != nil {
merr.Add(h.wal.Close())
} }
return h.wal.Close() return merr.Err()
} }
type headChunkReader struct { type headChunkReader struct {
head *Head head *Head
mint, maxt int64 mint, maxt int64
isoState *isolationState isoState *isolationState
memChunkPool *sync.Pool
} }
func (h *headChunkReader) Close() error { func (h *headChunkReader) Close() error {
@ -1315,10 +1447,17 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
} }
s.Lock() s.Lock()
c := s.chunk(int(cid)) c, garbageCollect := s.chunk(int(cid), h.head.chunkDiskMapper)
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
c.chunk = nil
h.memChunkPool.Put(c)
}
}()
// This means that the chunk has been garbage collected or is outside // This means that the chunk has been garbage collected (or) is outside
// the specified range. // the specified range (or) Head is closing.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock() s.Unlock()
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
@ -1326,23 +1465,25 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s.Unlock() s.Unlock()
return &safeChunk{ return &safeChunk{
Chunk: c.chunk, Chunk: c.chunk,
s: s, s: s,
cid: int(cid), cid: int(cid),
isoState: h.isoState, isoState: h.isoState,
chunkDiskMapper: h.head.chunkDiskMapper,
}, nil }, nil
} }
type safeChunk struct { type safeChunk struct {
chunkenc.Chunk chunkenc.Chunk
s *memSeries s *memSeries
cid int cid int
isoState *isolationState isoState *isolationState
chunkDiskMapper *chunks.ChunkDiskMapper
} }
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock() c.s.Lock()
it := c.s.iterator(c.cid, c.isoState, reuseIter) it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter)
c.s.Unlock() c.s.Unlock()
return it return it
} }
@ -1448,23 +1589,24 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
*chks = (*chks)[:0] *chks = (*chks)[:0]
for i, c := range s.chunks { for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range. // Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) { if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue continue
} }
// Set the head chunks as open (being appended to).
maxTime := c.maxTime
if s.headChunk == c {
maxTime = math.MaxInt64
}
*chks = append(*chks, chunks.Meta{ *chks = append(*chks, chunks.Meta{
MinTime: c.minTime, MinTime: c.minTime,
MaxTime: maxTime, MaxTime: c.maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))), Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
}) })
} }
if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) {
*chks = append(*chks, chunks.Meta{
MinTime: s.headChunk.minTime,
MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to).
Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))),
})
}
return nil return nil
} }
@ -1485,7 +1627,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
} }
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) { func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
s := newMemSeries(lset, id, h.chunkRange) s := newMemSeries(lset, id, h.chunkRange, &h.memChunkPool)
s, created := h.series.getOrSet(hash, s) s, created := h.series.getOrSet(hash, s)
if !created { if !created {
@ -1611,7 +1753,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
series.Lock() series.Lock()
rmChunks += series.truncateChunksBefore(mint) rmChunks += series.truncateChunksBefore(mint)
if len(series.chunks) > 0 || series.pendingCommit { if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit {
series.Unlock() series.Unlock()
continue continue
} }
@ -1704,12 +1846,12 @@ func (s sample) V() float64 {
type memSeries struct { type memSeries struct {
sync.RWMutex sync.RWMutex
ref uint64 ref uint64
lset labels.Labels lset labels.Labels
chunks []*memChunk mmappedChunks []*mmappedChunk
headChunk *memChunk headChunk *memChunk
chunkRange int64 chunkRange int64
firstChunkID int firstChunkID int
nextAt int64 // Timestamp at which to cut the next chunk. nextAt int64 // Timestamp at which to cut the next chunk.
sampleBuf [4]sample sampleBuf [4]sample
@ -1717,25 +1859,31 @@ type memSeries struct {
app chunkenc.Appender // Current appender for the chunk. app chunkenc.Appender // Current appender for the chunk.
memChunkPool *sync.Pool
txs *txRing txs *txRing
} }
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: id, ref: id,
chunkRange: chunkRange, chunkRange: chunkRange,
nextAt: math.MinInt64, nextAt: math.MinInt64,
txs: newTxRing(4), txs: newTxRing(4),
memChunkPool: memChunkPool,
} }
return s return s
} }
func (s *memSeries) minTime() int64 { func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 { if len(s.mmappedChunks) > 0 {
return math.MinInt64 return s.mmappedChunks[0].minTime
} }
return s.chunks[0].minTime if s.headChunk != nil {
return s.headChunk.minTime
}
return math.MinInt64
} }
func (s *memSeries) maxTime() int64 { func (s *memSeries) maxTime() int64 {
@ -1746,30 +1894,45 @@ func (s *memSeries) maxTime() int64 {
return c.maxTime return c.maxTime
} }
func (s *memSeries) cut(mint int64) *memChunk { func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
c := &memChunk{ s.mmapCurrentHeadChunk(chunkDiskMapper)
s.headChunk = &memChunk{
chunk: chunkenc.NewXORChunk(), chunk: chunkenc.NewXORChunk(),
minTime: mint, minTime: mint,
maxTime: math.MinInt64, maxTime: math.MinInt64,
} }
s.chunks = append(s.chunks, c)
s.headChunk = c
// Remove exceeding capacity from the previous chunk byte slice to save memory.
if l := len(s.chunks); l > 1 {
s.chunks[l-2].chunk.Compact()
}
// Set upper bound on when the next chunk must be started. An earlier timestamp // Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point. // may be chosen dynamically at a later point.
s.nextAt = rangeForTimestamp(mint, s.chunkRange) s.nextAt = rangeForTimestamp(mint, s.chunkRange)
app, err := c.chunk.Appender() app, err := s.headChunk.chunk.Appender()
if err != nil { if err != nil {
panic(err) panic(err)
} }
s.app = app s.app = app
return c return s.headChunk
}
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {
if s.headChunk == nil {
// There is no head chunk, so nothing to m-map here.
return
}
chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk)
if err != nil {
if err != chunks.ErrChunkDiskMapperClosed {
panic(err)
}
}
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(s.headChunk.chunk.NumSamples()),
minTime: s.headChunk.minTime,
maxTime: s.headChunk.maxTime,
})
} }
// appendable checks whether the given sample is valid for appending to the series. // appendable checks whether the given sample is valid for appending to the series.
@ -1793,12 +1956,34 @@ func (s *memSeries) appendable(t int64, v float64) error {
return nil return nil
} }
func (s *memSeries) chunk(id int) *memChunk { // chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage.
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := id - s.firstChunkID ix := id - s.firstChunkID
if ix < 0 || ix >= len(s.chunks) { if ix < 0 || ix > len(s.mmappedChunks) {
return nil return nil, false
} }
return s.chunks[ix] if ix == len(s.mmappedChunks) {
return s.headChunk, false
}
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
if err == chunks.ErrChunkDiskMapperClosed {
return nil, false
}
// TODO(codesome): Find a better way to handle this error instead of a panic.
panic(err)
}
mc := s.memChunkPool.Get().(*memChunk)
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
return mc, true
} }
func (s *memSeries) chunkID(pos int) int { func (s *memSeries) chunkID(pos int) int {
@ -1809,27 +1994,32 @@ func (s *memSeries) chunkID(pos int) int {
// at or after mint. Chunk IDs remain unchanged. // at or after mint. Chunk IDs remain unchanged.
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
var k int var k int
for i, c := range s.chunks { if s.headChunk != nil && s.headChunk.maxTime < mint {
if c.maxTime >= mint { // If head chunk is truncated, we can truncate all mmapped chunks.
break k = 1 + len(s.mmappedChunks)
} s.firstChunkID += k
k = i + 1
}
s.chunks = append(s.chunks[:0], s.chunks[k:]...)
s.firstChunkID += k
if len(s.chunks) == 0 {
s.headChunk = nil s.headChunk = nil
} else { s.mmappedChunks = nil
s.headChunk = s.chunks[len(s.chunks)-1] return k
}
if len(s.mmappedChunks) > 0 {
for i, c := range s.mmappedChunks {
if c.maxTime >= mint {
break
}
k = i + 1
}
s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[k:]...)
s.firstChunkID += k
} }
return k return k
} }
// append adds the sample (t, v) to the series. The caller also has to provide // append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation. (The appendID can be zero, which results in no // the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.) // isolation for this append.)
func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) { // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio // Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases // so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples. // the time range within which we have to decompress all samples.
@ -1838,7 +2028,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkC
c := s.head() c := s.head()
if c == nil { if c == nil {
c = s.cut(t) if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it.
return false, false
}
// There is no chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, chunkDiskMapper)
chunkCreated = true chunkCreated = true
} }
numSamples := c.chunk.NumSamples() numSamples := c.chunk.NumSamples()
@ -1854,7 +2049,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkC
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
} }
if t >= s.nextAt { if t >= s.nextAt {
c = s.cut(t) c = s.cutNewHeadChunk(t, chunkDiskMapper)
chunkCreated = true chunkCreated = true
} }
s.app.Append(t, v) s.app.Append(t, v)
@ -1890,8 +2085,19 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a return start + (max-start)/a
} }
func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator { // iterator returns a chunk iterator.
c := s.chunk(id) // It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect := s.chunk(id, chunkDiskMapper)
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
// This should be done always at the end.
c.chunk = nil
s.memChunkPool.Put(c)
}
}()
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a // TODO(fabxc): Work around! A querier may have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got // series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any // accessed. We must ensure to not garbage collect as long as any
@ -1909,12 +2115,18 @@ func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Itera
totalSamples := 0 // Total samples in this series. totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk. previousSamples := 0 // Samples before this chunk.
for j, d := range s.chunks { for j, d := range s.mmappedChunks {
totalSamples += d.chunk.NumSamples() totalSamples += int(d.numSamples)
if j < ix { if j < ix {
previousSamples += d.chunk.NumSamples() previousSamples += int(d.numSamples)
} }
} }
// mmappedChunks does not contain the last chunk. Hence check it separately.
if len(s.mmappedChunks) < ix {
previousSamples += s.headChunk.chunk.NumSamples()
} else {
totalSamples += s.headChunk.chunk.NumSamples()
}
// Removing the extra transactionIDs that are relevant for samples that // Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs. // come after this chunk, from the total transactionIDs.
@ -1943,7 +2155,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Itera
return chunkenc.NewNopIterator() return chunkenc.NewNopIterator()
} }
if id-s.firstChunkID < len(s.chunks)-1 { if id-s.firstChunkID < len(s.mmappedChunks) {
if stopAfter == numSamples { if stopAfter == numSamples {
return c.chunk.Iterator(it) return c.chunk.Iterator(it)
} }
@ -1989,7 +2201,7 @@ type memChunk struct {
minTime, maxTime int64 minTime, maxTime int64
} }
// Returns true if the chunk overlaps [mint, maxt]. // OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime return mc.minTime <= maxt && mint <= mc.maxTime
} }
@ -2052,3 +2264,14 @@ func (ss stringset) slice() []string {
sort.Strings(slice) sort.Strings(slice)
return slice return slice
} }
type mmappedChunk struct {
ref uint64
numSamples uint16
minTime, maxTime int64
}
// Returns true if the chunk overlaps [mint, maxt].
func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}

View file

@ -14,6 +14,8 @@
package tsdb package tsdb
import ( import (
"io/ioutil"
"os"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -23,8 +25,13 @@ import (
) )
func BenchmarkHeadStripeSeriesCreate(b *testing.B) { func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
// Put a series, select it. GC it and then access it. // Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(b, err) testutil.Ok(b, err)
defer h.Close() defer h.Close()
@ -34,8 +41,13 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
} }
func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
// Put a series, select it. GC it and then access it. // Put a series, select it. GC it and then access it.
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(b, err) testutil.Ok(b, err)
defer h.Close() defer h.Close()

File diff suppressed because it is too large Load diff

View file

@ -30,7 +30,12 @@ const (
) )
func BenchmarkPostingsForMatchers(b *testing.B) { func BenchmarkPostingsForMatchers(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(b, err) testutil.Ok(b, err)
defer func() { defer func() {
testutil.Ok(b, h.Close()) testutil.Ok(b, h.Close())
@ -126,7 +131,12 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
} }
func BenchmarkQuerierSelect(b *testing.B) { func BenchmarkQuerierSelect(b *testing.B) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(b, err) testutil.Ok(b, err)
defer h.Close() defer h.Close()
app := h.Appender() app := h.Appender()

View file

@ -1857,7 +1857,12 @@ func TestFindSetMatches(t *testing.T) {
} }
func TestPostingsForMatchers(t *testing.T) { func TestPostingsForMatchers(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(chunkDir))
}()
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize)
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { defer func() {
testutil.Ok(t, h.Close()) testutil.Ok(t, h.Close())
@ -2220,7 +2225,12 @@ func BenchmarkQueries(b *testing.B) {
queryTypes["_3-Blocks"] = &querier{blocks: qs[0:3]} queryTypes["_3-Blocks"] = &querier{blocks: qs[0:3]}
queryTypes["_10-Blocks"] = &querier{blocks: qs} queryTypes["_10-Blocks"] = &querier{blocks: qs}
head := createHead(b, series) chunkDir, err := ioutil.TempDir("", "chunk_dir")
testutil.Ok(b, err)
defer func() {
testutil.Ok(b, os.RemoveAll(chunkDir))
}()
head := createHead(b, series, chunkDir)
qHead, err := NewBlockQuerier(head, 1, int64(nSamples)) qHead, err := NewBlockQuerier(head, 1, int64(nSamples))
testutil.Ok(b, err) testutil.Ok(b, err)
queryTypes["_Head"] = qHead queryTypes["_Head"] = qHead
@ -2232,6 +2242,7 @@ func BenchmarkQueries(b *testing.B) {
benchQuery(b, expExpansions, querier, selectors) benchQuery(b, expExpansions, querier, selectors)
}) })
} }
testutil.Ok(b, head.Close())
} }
} }
} }

View file

@ -32,8 +32,9 @@ type MetricSample struct {
} }
// CreateHead creates a TSDB writer head to write the sample data to. // CreateHead creates a TSDB writer head to write the sample data to.
func CreateHead(samples []*MetricSample, chunkRange int64, logger log.Logger) (*Head, error) { func CreateHead(samples []*MetricSample, chunkRange int64, chunkDir string, logger log.Logger) (*Head, error) {
head, err := NewHead(nil, logger, nil, chunkRange, DefaultStripeSize) head, err := NewHead(nil, logger, nil, chunkRange, chunkDir, nil, DefaultStripeSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -60,10 +61,15 @@ func CreateBlock(samples []*MetricSample, dir string, mint, maxt int64, logger l
if chunkRange < 0 { if chunkRange < 0 {
return "", ErrInvalidTimes return "", ErrInvalidTimes
} }
head, err := CreateHead(samples, chunkRange, logger) chunkDir := filepath.Join(dir, "chunks_tmp")
defer func() {
os.RemoveAll(chunkDir)
}()
head, err := CreateHead(samples, chunkRange, chunkDir, logger)
if err != nil { if err != nil {
return "", err return "", err
} }
defer head.Close()
compactor, err := NewLeveledCompactor(context.Background(), nil, logger, ExponentialBlockRanges(DefaultBlockDuration, 3, 5), nil) compactor, err := NewLeveledCompactor(context.Background(), nil, logger, ExponentialBlockRanges(DefaultBlockDuration, 3, 5), nil)
if err != nil { if err != nil {

View file

@ -1898,8 +1898,18 @@ type fakeDB struct {
func (f *fakeDB) CleanTombstones() error { return f.err } func (f *fakeDB) CleanTombstones() error { return f.err }
func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err } func (f *fakeDB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { return f.err }
func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err } func (f *fakeDB) Snapshot(dir string, withHead bool) error { return f.err }
func (f *fakeDB) Stats(statsByLabelName string) (*tsdb.Stats, error) { func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) {
h, _ := tsdb.NewHead(nil, nil, nil, 1000, tsdb.DefaultStripeSize) dbDir, err := ioutil.TempDir("", "tsdb-api-ready")
if err != nil {
return nil, err
}
defer func() {
err := os.RemoveAll(dbDir)
if retErr != nil {
retErr = err
}
}()
h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize)
return h.Stats(statsByLabelName), nil return h.Stats(statsByLabelName), nil
} }

View file

@ -100,6 +100,7 @@ func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) {
func TestReadyAndHealthy(t *testing.T) { func TestReadyAndHealthy(t *testing.T) {
t.Parallel() t.Parallel()
dbDir, err := ioutil.TempDir("", "tsdb-ready") dbDir, err := ioutil.TempDir("", "tsdb-ready")
testutil.Ok(t, err) testutil.Ok(t, err)
defer testutil.Ok(t, os.RemoveAll(dbDir)) defer testutil.Ok(t, os.RemoveAll(dbDir))
@ -426,7 +427,6 @@ func TestDebugHandler(t *testing.T) {
func TestHTTPMetrics(t *testing.T) { func TestHTTPMetrics(t *testing.T) {
t.Parallel() t.Parallel()
handler := New(nil, &Options{ handler := New(nil, &Options{
RoutePrefix: "/", RoutePrefix: "/",
ListenAddress: "somehost:9090", ListenAddress: "somehost:9090",