fixing merge mistakes

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>
This commit is contained in:
Mauro Stettler 2022-01-12 18:49:20 +00:00
parent f4d628d419
commit 6ed72dadca
No known key found for this signature in database
GPG key ID: 1EBC1C55E2D201A2
3 changed files with 28 additions and 24 deletions

View file

@ -163,3 +163,22 @@ func (c *chunkWriteQueue) stop() {
c.workerWg.Wait() c.workerWg.Wait()
} }
func (c *chunkWriteQueue) queueIsEmpty() bool {
return c.queueSize() == 0
}
func (c *chunkWriteQueue) queueIsFull() bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return c.queueSize() == cap(c.jobs)+1
}
func (c *chunkWriteQueue) queueSize() int {
c.chunkRefMapMtx.Lock()
defer c.chunkRefMapMtx.Unlock()
// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(c.chunkRefMap)
}

View file

@ -146,7 +146,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
} }
// The queue should be full. // The queue should be full.
require.True(t, queueIsFull(q)) require.True(t, q.queueIsFull())
// Adding another job should block as long as no job from the queue gets consumed. // Adding another job should block as long as no job from the queue gets consumed.
addedJob := atomic.NewBool(false) addedJob := atomic.NewBool(false)
@ -166,19 +166,19 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10) require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10)
// The queue should be full again. // The queue should be full again.
require.True(t, queueIsFull(q)) require.True(t, q.queueIsFull())
// Consume <sizeLimit>+1 jobs from the queue. // Consume <sizeLimit>+1 jobs from the queue.
// To drain the queue we need to consume <sizeLimit>+1 jobs because 1 job // To drain the queue we need to consume <sizeLimit>+1 jobs because 1 job
// is already in the state of being processed. // is already in the state of being processed.
for job := 0; job < sizeLimit+1; job++ { for job := 0; job < sizeLimit+1; job++ {
require.False(t, queueIsEmpty(q)) require.False(t, q.queueIsEmpty())
unblockChunkWriter() unblockChunkWriter()
} }
// Wait until all jobs have been processed. // Wait until all jobs have been processed.
callbackWg.Wait() callbackWg.Wait()
require.True(t, queueIsEmpty(q)) require.True(t, q.queueIsEmpty())
} }
func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
@ -266,22 +266,3 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
}) })
} }
} }
func queueIsEmpty(q *chunkWriteQueue) bool {
return queueSize(q) == 0
}
func queueIsFull(q *chunkWriteQueue) bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return queueSize(q) == cap(q.jobs)+1
}
func queueSize(q *chunkWriteQueue) int {
q.chunkRefMapMtx.Lock()
defer q.chunkRefMapMtx.Unlock()
// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(q.chunkRefMap)
}

View file

@ -61,7 +61,7 @@ const (
CRCSize = 4 CRCSize = 4
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data. // MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
// Max because the uvarint size can be smaller. // Max because the uvarint size can be smaller.
MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + MaxChunkLengthFieldSize + CRCSize MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize
// MinWriteBufferSize is the minimum write buffer size allowed. // MinWriteBufferSize is the minimum write buffer size allowed.
MinWriteBufferSize = 64 * 1024 // 64KB. MinWriteBufferSize = 64 * 1024 // 64KB.
// MaxWriteBufferSize is the maximum write buffer size allowed. // MaxWriteBufferSize is the maximum write buffer size allowed.
@ -473,6 +473,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() {
cdm.evtlPos.cutFileOnNextChunk() cdm.evtlPos.cutFileOnNextChunk()
} }
func (cdm *ChunkDiskMapper) IsQueueEmpty() bool {
return cdm.writeQueue.queueIsEmpty()
}
// cutAndExpectRef creates a new m-mapped file. // cutAndExpectRef creates a new m-mapped file.
// The write lock should be held before calling this. // The write lock should be held before calling this.
// It ensures that the position in the new file matches the given chunk reference, if not then it errors. // It ensures that the position in the new file matches the given chunk reference, if not then it errors.