mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-25 12:42:47 -08:00
Add option to customise head chunks write buffer size (#8201)
* Add option to customise head chunks write buffer size Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed tests Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
1797192f02
commit
db19e05d93
|
@ -322,7 +322,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head {
|
func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head {
|
||||||
head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, DefaultStripeSize, nil)
|
head, err := NewHead(nil, nil, w, DefaultBlockDuration, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
|
|
||||||
app := head.Appender(context.Background())
|
app := head.Appender(context.Background())
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BlockWriter is a block writer that allows appending and flushing series to disk.
|
// BlockWriter is a block writer that allows appending and flushing series to disk.
|
||||||
|
@ -67,7 +68,7 @@ func (w *BlockWriter) initHead() error {
|
||||||
}
|
}
|
||||||
w.chunkDir = chunkDir
|
w.chunkDir = chunkDir
|
||||||
|
|
||||||
h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, w.logger, nil, w.blockSize, w.chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "tsdb.NewHead")
|
return errors.Wrap(err, "tsdb.NewHead")
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,6 @@ const (
|
||||||
MagicHeadChunks = 0x0130BC91
|
MagicHeadChunks = 0x0130BC91
|
||||||
|
|
||||||
headChunksFormatV1 = 1
|
headChunksFormatV1 = 1
|
||||||
writeBufferSize = 4 * 1024 * 1024 // 4 MiB.
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -63,6 +62,12 @@ const (
|
||||||
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
|
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
|
||||||
// Max because the uvarint size can be smaller.
|
// Max because the uvarint size can be smaller.
|
||||||
MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize
|
MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize
|
||||||
|
// MinWriteBufferSize is the minimum write buffer size allowed.
|
||||||
|
MinWriteBufferSize = 64 * 1024 // 64KB.
|
||||||
|
// MaxWriteBufferSize is the maximum write buffer size allowed.
|
||||||
|
MaxWriteBufferSize = 8 * 1024 * 1024 // 8 MiB.
|
||||||
|
// DefaultWriteBufferSize is the default write buffer size.
|
||||||
|
DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB.
|
||||||
)
|
)
|
||||||
|
|
||||||
// corruptionErr is an error that's returned when corruption is encountered.
|
// corruptionErr is an error that's returned when corruption is encountered.
|
||||||
|
@ -82,7 +87,8 @@ type ChunkDiskMapper struct {
|
||||||
curFileNumBytes atomic.Int64 // Bytes written in current open file.
|
curFileNumBytes atomic.Int64 // Bytes written in current open file.
|
||||||
|
|
||||||
/// Writer.
|
/// Writer.
|
||||||
dir *os.File
|
dir *os.File
|
||||||
|
writeBufferSize int
|
||||||
|
|
||||||
curFile *os.File // File being written to.
|
curFile *os.File // File being written to.
|
||||||
curFileSequence int // Index of current open file being appended to.
|
curFileSequence int // Index of current open file being appended to.
|
||||||
|
@ -121,7 +127,15 @@ type mmappedChunkFile struct {
|
||||||
// using the default head chunk file duration.
|
// using the default head chunk file duration.
|
||||||
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
|
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
|
||||||
// to set the maxt of all the file.
|
// to set the maxt of all the file.
|
||||||
func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) {
|
func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*ChunkDiskMapper, error) {
|
||||||
|
// Validate write buffer size.
|
||||||
|
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
|
||||||
|
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxHeadChunkFileSize, writeBufferSize)
|
||||||
|
}
|
||||||
|
if writeBufferSize%1024 != 0 {
|
||||||
|
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
|
||||||
|
}
|
||||||
|
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -131,10 +145,11 @@ func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &ChunkDiskMapper{
|
m := &ChunkDiskMapper{
|
||||||
dir: dirFile,
|
dir: dirFile,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
crc32: newCRC32(),
|
writeBufferSize: writeBufferSize,
|
||||||
chunkBuffer: newChunkBuffer(),
|
crc32: newCRC32(),
|
||||||
|
chunkBuffer: newChunkBuffer(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.pool == nil {
|
if m.pool == nil {
|
||||||
|
@ -273,7 +288,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c
|
||||||
|
|
||||||
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
|
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
|
||||||
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
|
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
|
||||||
if len(chk.Bytes())+MaxHeadChunkMetaSize < writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
|
if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
|
||||||
if err := cdm.flushBuffer(); err != nil {
|
if err := cdm.flushBuffer(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -313,7 +328,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c
|
||||||
|
|
||||||
cdm.chunkBuffer.put(chkRef, chk)
|
cdm.chunkBuffer.put(chkRef, chk)
|
||||||
|
|
||||||
if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize {
|
if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize {
|
||||||
// The chunk was bigger than the buffer itself.
|
// The chunk was bigger than the buffer itself.
|
||||||
// Flushing to not keep partial chunks in buffer.
|
// Flushing to not keep partial chunks in buffer.
|
||||||
if err := cdm.flushBuffer(); err != nil {
|
if err := cdm.flushBuffer(); err != nil {
|
||||||
|
@ -382,7 +397,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) {
|
||||||
if cdm.chkWriter != nil {
|
if cdm.chkWriter != nil {
|
||||||
cdm.chkWriter.Reset(newFile)
|
cdm.chkWriter.Reset(newFile)
|
||||||
} else {
|
} else {
|
||||||
cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize)
|
cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
cdm.closers[cdm.curFileSequence] = mmapFile
|
cdm.closers[cdm.curFileSequence] = mmapFile
|
||||||
|
|
|
@ -133,7 +133,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
|
||||||
// Testing IterateAllChunks method.
|
// Testing IterateAllChunks method.
|
||||||
dir := hrw.dir.Name()
|
dir := hrw.dir.Name()
|
||||||
require.NoError(t, hrw.Close())
|
require.NoError(t, hrw.Close())
|
||||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
idx := 0
|
idx := 0
|
||||||
|
@ -223,7 +223,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
|
||||||
|
|
||||||
// Restarted.
|
// Restarted.
|
||||||
var err error
|
var err error
|
||||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.False(t, hrw.fileMaxtSet)
|
require.False(t, hrw.fileMaxtSet)
|
||||||
|
@ -316,7 +316,7 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
|
||||||
|
|
||||||
// Restarting checks for unsequential files.
|
// Restarting checks for unsequential files.
|
||||||
var err error
|
var err error
|
||||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
verifyFiles([]int{3, 4, 5, 6, 7})
|
verifyFiles([]int{3, 4, 5, 6, 7})
|
||||||
}
|
}
|
||||||
|
@ -337,7 +337,7 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
||||||
require.NoError(t, hrw.Close())
|
require.NoError(t, hrw.Close())
|
||||||
|
|
||||||
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
|
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
|
||||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Forcefully failing IterateAllChunks.
|
// Forcefully failing IterateAllChunks.
|
||||||
|
@ -394,7 +394,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
|
||||||
require.NoError(t, f.Close())
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
// Open chunk disk mapper again, corrupt file should be removed.
|
// Open chunk disk mapper again, corrupt file should be removed.
|
||||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, hrw.fileMaxtSet)
|
require.False(t, hrw.fileMaxtSet)
|
||||||
require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
||||||
|
@ -425,7 +425,7 @@ func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper {
|
||||||
require.NoError(t, os.RemoveAll(tmpdir))
|
require.NoError(t, os.RemoveAll(tmpdir))
|
||||||
})
|
})
|
||||||
|
|
||||||
hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool())
|
hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.False(t, hrw.fileMaxtSet)
|
require.False(t, hrw.fileMaxtSet)
|
||||||
require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
require.NoError(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
||||||
|
|
|
@ -1093,7 +1093,7 @@ func BenchmarkCompactionFromHead(b *testing.B) {
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(b, os.RemoveAll(chunkDir))
|
require.NoError(b, os.RemoveAll(chunkDir))
|
||||||
}()
|
}()
|
||||||
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
for ln := 0; ln < labelNames; ln++ {
|
for ln := 0; ln < labelNames; ln++ {
|
||||||
app := h.Appender(context.Background())
|
app := h.Appender(context.Background())
|
||||||
|
|
33
tsdb/db.go
33
tsdb/db.go
|
@ -39,6 +39,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||||
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
|
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
|
||||||
|
@ -67,14 +68,15 @@ var (
|
||||||
// millisecond precision timestamps.
|
// millisecond precision timestamps.
|
||||||
func DefaultOptions() *Options {
|
func DefaultOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
WALSegmentSize: wal.DefaultSegmentSize,
|
WALSegmentSize: wal.DefaultSegmentSize,
|
||||||
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
||||||
MinBlockDuration: DefaultBlockDuration,
|
MinBlockDuration: DefaultBlockDuration,
|
||||||
MaxBlockDuration: DefaultBlockDuration,
|
MaxBlockDuration: DefaultBlockDuration,
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
AllowOverlappingBlocks: false,
|
AllowOverlappingBlocks: false,
|
||||||
WALCompression: false,
|
WALCompression: false,
|
||||||
StripeSize: DefaultStripeSize,
|
StripeSize: DefaultStripeSize,
|
||||||
|
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,6 +124,9 @@ type Options struct {
|
||||||
// Typically it is in milliseconds.
|
// Typically it is in milliseconds.
|
||||||
MaxBlockDuration int64
|
MaxBlockDuration int64
|
||||||
|
|
||||||
|
// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
|
||||||
|
HeadChunksWriteBufferSize int
|
||||||
|
|
||||||
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
|
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
|
||||||
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
|
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
|
||||||
SeriesLifecycleCallback SeriesLifecycleCallback
|
SeriesLifecycleCallback SeriesLifecycleCallback
|
||||||
|
@ -328,7 +333,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil)
|
head, err := NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -381,7 +386,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
|
||||||
blocks[i] = b
|
blocks[i] = b
|
||||||
}
|
}
|
||||||
|
|
||||||
head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil)
|
head, err := NewHead(nil, db.logger, nil, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -399,7 +404,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, DefaultStripeSize, nil)
|
head, err = NewHead(nil, db.logger, w, DefaultBlockDuration, db.dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -531,7 +536,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
||||||
if opts.StripeSize <= 0 {
|
if opts.StripeSize <= 0 {
|
||||||
opts.StripeSize = DefaultStripeSize
|
opts.StripeSize = DefaultStripeSize
|
||||||
}
|
}
|
||||||
|
if opts.HeadChunksWriteBufferSize <= 0 {
|
||||||
|
opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize
|
||||||
|
}
|
||||||
if opts.MinBlockDuration <= 0 {
|
if opts.MinBlockDuration <= 0 {
|
||||||
opts.MinBlockDuration = DefaultBlockDuration
|
opts.MinBlockDuration = DefaultBlockDuration
|
||||||
}
|
}
|
||||||
|
@ -642,7 +649,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.StripeSize, opts.SeriesLifecycleCallback)
|
db.head, err = NewHead(r, l, wlog, rngs[0], dir, db.chunkPool, opts.HeadChunksWriteBufferSize, opts.StripeSize, opts.SeriesLifecycleCallback)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,9 +193,9 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
||||||
require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet)
|
require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestNoPanicAfterWALCorrutpion ensures that querying the db after a WAL corruption doesn't cause a panic.
|
// TestNoPanicAfterWALCorruption ensures that querying the db after a WAL corruption doesn't cause a panic.
|
||||||
// https://github.com/prometheus/prometheus/issues/7548
|
// https://github.com/prometheus/prometheus/issues/7548
|
||||||
func TestNoPanicAfterWALCorrutpion(t *testing.T) {
|
func TestNoPanicAfterWALCorruption(t *testing.T) {
|
||||||
db := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil)
|
db := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil)
|
||||||
|
|
||||||
// Append until the first mmaped head chunk.
|
// Append until the first mmaped head chunk.
|
||||||
|
|
|
@ -295,7 +295,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
|
||||||
// stripeSize sets the number of entries in the hash map, it must be a power of 2.
|
// stripeSize sets the number of entries in the hash map, it must be a power of 2.
|
||||||
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
|
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
|
||||||
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
|
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
|
||||||
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, pool chunkenc.Pool, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) {
|
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64, chkDirRoot string, chkPool chunkenc.Pool, chkWriteBufferSize, stripeSize int, seriesCallback SeriesLifecycleCallback) (*Head, error) {
|
||||||
if l == nil {
|
if l == nil {
|
||||||
l = log.NewNopLogger()
|
l = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
|
@ -328,12 +328,12 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
|
||||||
h.lastWALTruncationTime.Store(math.MinInt64)
|
h.lastWALTruncationTime.Store(math.MinInt64)
|
||||||
h.metrics = newHeadMetrics(h, r)
|
h.metrics = newHeadMetrics(h, r)
|
||||||
|
|
||||||
if pool == nil {
|
if chkPool == nil {
|
||||||
pool = chunkenc.NewPool()
|
chkPool = chunkenc.NewPool()
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), pool)
|
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(mmappedChunksDir(chkDirRoot), chkPool, chkWriteBufferSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
|
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
|
||||||
|
@ -32,7 +33,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
|
||||||
require.NoError(b, os.RemoveAll(chunkDir))
|
require.NoError(b, os.RemoveAll(chunkDir))
|
||||||
}()
|
}()
|
||||||
// Put a series, select it. GC it and then access it.
|
// Put a series, select it. GC it and then access it.
|
||||||
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
|
||||||
|
@ -48,7 +49,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
|
||||||
require.NoError(b, os.RemoveAll(chunkDir))
|
require.NoError(b, os.RemoveAll(chunkDir))
|
||||||
}()
|
}()
|
||||||
// Put a series, select it. GC it and then access it.
|
// Put a series, select it. GC it and then access it.
|
||||||
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
|
||||||
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
||||||
|
@ -191,7 +191,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
||||||
|
|
||||||
// Load the WAL.
|
// Load the WAL.
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
h.Init(0)
|
h.Init(0)
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ func TestHead_WALMultiRef(t *testing.T) {
|
||||||
w, err = wal.New(nil, nil, w.Dir(), false)
|
w, err = wal.New(nil, nil, w.Dir(), false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, DefaultStripeSize, nil)
|
head, err = NewHead(nil, nil, w, 1000, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, head.Init(0))
|
require.NoError(t, head.Init(0))
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -421,7 +421,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||||
require.NoError(t, os.RemoveAll(dir))
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
// This is usually taken from the Head, but passing manually here.
|
// This is usually taken from the Head, but passing manually here.
|
||||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool())
|
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, chunkDiskMapper.Close())
|
require.NoError(t, chunkDiskMapper.Close())
|
||||||
|
@ -583,7 +583,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
||||||
// Compare the samples for both heads - before and after the reloadBlocks.
|
// Compare the samples for both heads - before and after the reloadBlocks.
|
||||||
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
|
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize, nil)
|
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, reloadedHead.Init(0))
|
require.NoError(t, reloadedHead.Init(0))
|
||||||
|
|
||||||
|
@ -963,7 +963,7 @@ func TestMemSeries_append(t *testing.T) {
|
||||||
require.NoError(t, os.RemoveAll(dir))
|
require.NoError(t, os.RemoveAll(dir))
|
||||||
}()
|
}()
|
||||||
// This is usually taken from the Head, but passing manually here.
|
// This is usually taken from the Head, but passing manually here.
|
||||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool())
|
chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, chunkDiskMapper.Close())
|
require.NoError(t, chunkDiskMapper.Close())
|
||||||
|
@ -1265,7 +1265,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
|
||||||
require.NoError(t, w.Log(test.rec))
|
require.NoError(t, w.Log(test.rec))
|
||||||
}
|
}
|
||||||
|
|
||||||
h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, w, 1, w.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
|
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
|
||||||
initErr := h.Init(math.MinInt64)
|
initErr := h.Init(math.MinInt64)
|
||||||
|
@ -1320,7 +1320,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
||||||
w, err := wal.New(nil, nil, walDir, false)
|
w, err := wal.New(nil, nil, walDir, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
h, err := NewHead(nil, nil, w, chunkRange, dir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, w, chunkRange, dir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
|
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
|
||||||
require.NoError(t, h.Init(math.MinInt64))
|
require.NoError(t, h.Init(math.MinInt64))
|
||||||
|
@ -1550,7 +1550,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
||||||
|
|
||||||
wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false)
|
wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, DefaultStripeSize, nil)
|
hb, err = NewHead(nil, nil, wlog, 1000, wlog.Dir(), nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
defer func() { require.NoError(t, hb.Close()) }()
|
defer func() { require.NoError(t, hb.Close()) }()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, hb.Init(0))
|
require.NoError(t, hb.Init(0))
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Make entries ~50B in size, to emulate real-world high cardinality.
|
// Make entries ~50B in size, to emulate real-world high cardinality.
|
||||||
|
@ -37,7 +38,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(b, os.RemoveAll(chunkDir))
|
require.NoError(b, os.RemoveAll(chunkDir))
|
||||||
}()
|
}()
|
||||||
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(b, h.Close())
|
require.NoError(b, h.Close())
|
||||||
|
@ -146,7 +147,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(b, os.RemoveAll(chunkDir))
|
require.NoError(b, os.RemoveAll(chunkDir))
|
||||||
}()
|
}()
|
||||||
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
app := h.Appender(context.Background())
|
app := h.Appender(context.Background())
|
||||||
|
|
|
@ -407,7 +407,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
h, err := NewHead(nil, nil, nil, 2*time.Hour.Milliseconds(), "", nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 2*time.Hour.Milliseconds(), "", nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer h.Close()
|
defer h.Close()
|
||||||
|
|
||||||
|
@ -1550,7 +1550,7 @@ func TestPostingsForMatchers(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, os.RemoveAll(chunkDir))
|
require.NoError(t, os.RemoveAll(chunkDir))
|
||||||
}()
|
}()
|
||||||
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, DefaultStripeSize, nil)
|
h, err := NewHead(nil, nil, nil, 1000, chunkDir, nil, chunks.DefaultWriteBufferSize, DefaultStripeSize, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
|
|
|
@ -56,6 +56,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/remote"
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/util/teststorage"
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2126,7 +2127,7 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) {
|
||||||
retErr = err
|
retErr = err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, tsdb.DefaultStripeSize, nil)
|
h, _ := tsdb.NewHead(nil, nil, nil, 1000, "", nil, chunks.DefaultWriteBufferSize, tsdb.DefaultStripeSize, nil)
|
||||||
return h.Stats(statsByLabelName), nil
|
return h.Stats(statsByLabelName), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue