diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index ce9b94d85b..5cdd2e81f0 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -163,3 +163,22 @@ func (c *chunkWriteQueue) stop() { c.workerWg.Wait() } + +func (c *chunkWriteQueue) queueIsEmpty() bool { + return c.queueSize() == 0 +} + +func (c *chunkWriteQueue) queueIsFull() bool { + // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh + // because one job is currently being processed and blocked in the writer. + return c.queueSize() == cap(c.jobs)+1 +} + +func (c *chunkWriteQueue) queueSize() int { + c.chunkRefMapMtx.Lock() + defer c.chunkRefMapMtx.Unlock() + + // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has + // been fully processed, it remains in the chunkRefMap until the processing is complete. + return len(c.chunkRefMap) +} diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go index 251c96395a..0e971a336b 100644 --- a/tsdb/chunks/chunk_write_queue_test.go +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -146,7 +146,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { } // The queue should be full. - require.True(t, queueIsFull(q)) + require.True(t, q.queueIsFull()) // Adding another job should block as long as no job from the queue gets consumed. addedJob := atomic.NewBool(false) @@ -166,19 +166,19 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10) // The queue should be full again. - require.True(t, queueIsFull(q)) + require.True(t, q.queueIsFull()) // Consume +1 jobs from the queue. // To drain the queue we need to consume +1 jobs because 1 job // is already in the state of being processed. for job := 0; job < sizeLimit+1; job++ { - require.False(t, queueIsEmpty(q)) + require.False(t, q.queueIsEmpty()) unblockChunkWriter() } // Wait until all jobs have been processed. callbackWg.Wait() - require.True(t, queueIsEmpty(q)) + require.True(t, q.queueIsEmpty()) } func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { @@ -266,22 +266,3 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) { }) } } - -func queueIsEmpty(q *chunkWriteQueue) bool { - return queueSize(q) == 0 -} - -func queueIsFull(q *chunkWriteQueue) bool { - // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh - // because one job is currently being processed and blocked in the writer. - return queueSize(q) == cap(q.jobs)+1 -} - -func queueSize(q *chunkWriteQueue) int { - q.chunkRefMapMtx.Lock() - defer q.chunkRefMapMtx.Unlock() - - // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has - // been fully processed, it remains in the chunkRefMap until the processing is complete. - return len(q.chunkRefMap) -} diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 3f62d9f008..1b5845808a 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -61,7 +61,7 @@ const ( CRCSize = 4 // MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data. // Max because the uvarint size can be smaller. - MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + MaxChunkLengthFieldSize + CRCSize + MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize // MinWriteBufferSize is the minimum write buffer size allowed. MinWriteBufferSize = 64 * 1024 // 64KB. // MaxWriteBufferSize is the maximum write buffer size allowed. @@ -473,6 +473,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() { cdm.evtlPos.cutFileOnNextChunk() } +func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { + return cdm.writeQueue.queueIsEmpty() +} + // cutAndExpectRef creates a new m-mapped file. // The write lock should be held before calling this. // It ensures that the position in the new file matches the given chunk reference, if not then it errors.