Disable chunk write queue by default, allow user to configure the exact size (#10425)

* Disable chunk write queue by default

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* update flag description

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>
This commit is contained in:
Mauro Stettler 2022-03-11 13:26:59 -03:00 committed by GitHub
parent ead032919a
commit b025390cb4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 16 deletions

View file

@ -305,6 +305,9 @@ func main() {
serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL."). serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL.").
Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression)
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage.").
Default("data-agent/").StringVar(&cfg.agentStoragePath) Default("data-agent/").StringVar(&cfg.agentStoragePath)
@ -1484,6 +1487,7 @@ type tsdbOptions struct {
NoLockfile bool NoLockfile bool
AllowOverlappingBlocks bool AllowOverlappingBlocks bool
WALCompression bool WALCompression bool
HeadChunksWriteQueueSize int
StripeSize int StripeSize int
MinBlockDuration model.Duration MinBlockDuration model.Duration
MaxBlockDuration model.Duration MaxBlockDuration model.Duration
@ -1501,6 +1505,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
NoLockfile: opts.NoLockfile, NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks, AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression, WALCompression: opts.WALCompression,
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
StripeSize: opts.StripeSize, StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),

View file

@ -69,7 +69,8 @@ const (
// DefaultWriteBufferSize is the default write buffer size. // DefaultWriteBufferSize is the default write buffer size.
DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB. DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB.
// DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk. // DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk.
DefaultWriteQueueSize = 1000 // A value of 0 completely disables this feature.
DefaultWriteQueueSize = 0
) )
// ChunkDiskMapperRef represents the location of a head chunk on disk. // ChunkDiskMapperRef represents the location of a head chunk on disk.
@ -249,7 +250,10 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo
crc32: newCRC32(), crc32: newCRC32(),
chunkBuffer: newChunkBuffer(), chunkBuffer: newChunkBuffer(),
} }
if writeQueueSize > 0 {
m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk) m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk)
}
if m.pool == nil { if m.pool == nil {
m.pool = chunkenc.NewPool() m.pool = chunkenc.NewPool()
@ -375,18 +379,33 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
// WriteChunk writes the chunk to the disk. // WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
// cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue).
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
ref, cutFile := cdm.evtlPos.getNextChunkRef(chk)
if cdm.writeQueue != nil {
return cdm.writeChunkViaQueue(ref, cutFile, seriesRef, mint, maxt, chk, callback)
}
err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, cutFile)
if callback != nil {
callback(err)
}
return ref
}
func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
var err error var err error
if callback != nil {
defer func() { defer func() {
if err != nil && callback != nil { if err != nil {
callback(err) callback(err)
} }
}() }()
}
// cdm.evtlPosMtx must be held to serialize the calls to .getNextChunkRef() and .addJob().
cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock()
ref, cutFile := cdm.evtlPos.getNextChunkRef(chk)
err = cdm.writeQueue.addJob(chunkWriteJob{ err = cdm.writeQueue.addJob(chunkWriteJob{
cutFile: cutFile, cutFile: cutFile,
seriesRef: seriesRef, seriesRef: seriesRef,
@ -473,6 +492,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() {
} }
func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { func (cdm *ChunkDiskMapper) IsQueueEmpty() bool {
if cdm.writeQueue == nil {
return true
}
return cdm.writeQueue.queueIsEmpty() return cdm.writeQueue.queueIsEmpty()
} }
@ -602,10 +625,12 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil, ErrChunkDiskMapperClosed return nil, ErrChunkDiskMapperClosed
} }
if cdm.writeQueue != nil {
chunk := cdm.writeQueue.get(ref) chunk := cdm.writeQueue.get(ref)
if chunk != nil { if chunk != nil {
return chunk, nil return chunk, nil
} }
}
sgmIndex, chkStart := ref.Unpack() sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand. // We skip the series ref and the mint/maxt beforehand.
@ -962,7 +987,9 @@ func (cdm *ChunkDiskMapper) Close() error {
cdm.evtlPosMtx.Lock() cdm.evtlPosMtx.Lock()
defer cdm.evtlPosMtx.Unlock() defer cdm.evtlPosMtx.Unlock()
if cdm.writeQueue != nil {
cdm.writeQueue.stop() cdm.writeQueue.stop()
}
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks. // The lock order should not be reversed here else it can cause deadlocks.

View file

@ -27,6 +27,22 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
) )
var writeQueueSize int
func TestMain(m *testing.M) {
// Run all tests with the chunk write queue disabled.
writeQueueSize = 0
exitVal := m.Run()
if exitVal != 0 {
os.Exit(exitVal)
}
// Re-run all tests with the chunk write queue size of 1e6.
writeQueueSize = 1000000
exitVal = m.Run()
os.Exit(exitVal)
}
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := createChunkDiskMapper(t, "") hrw := createChunkDiskMapper(t, "")
defer func() { defer func() {
@ -453,7 +469,7 @@ func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper {
dir = t.TempDir() dir = t.TempDir()
} }
hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, DefaultWriteQueueSize) hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize)
require.NoError(t, err) require.NoError(t, err)
require.False(t, hrw.fileMaxtSet) require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))