diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index b635da5bc..ab34eb06c 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -24,11 +24,15 @@ import ( ) const ( - // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. + // Minimum recorded peak since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. chunkRefMapShrinkThreshold = 1000 // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. chunkRefMapMinShrinkInterval = 10 * time.Minute + + // Maximum size of segment used by job queue (number of elements). With chunkWriteJob being 64 bytes, + // this will use ~512 KiB for empty queue. + maxChunkQueueSegmentSize = 8192 ) type chunkWriteJob struct { @@ -45,7 +49,7 @@ type chunkWriteJob struct { // Chunks that shall be written get added to the queue, which is consumed asynchronously. // Adding jobs to the queue is non-blocking as long as the queue isn't full. type chunkWriteQueue struct { - jobs chan chunkWriteJob + jobs *writeJobQueue chunkRefMapMtx sync.RWMutex chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk @@ -83,8 +87,13 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu []string{"operation"}, ) + segmentSize := size + if segmentSize > maxChunkQueueSegmentSize { + segmentSize = maxChunkQueueSegmentSize + } + q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), + jobs: newWriteJobQueue(size, segmentSize), chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), chunkRefMapLastShrink: time.Now(), writeChunk: writeChunk, @@ -108,7 +117,12 @@ func (c *chunkWriteQueue) start() { go func() { defer c.workerWg.Done() - for job := range c.jobs { + for { + job, ok := c.jobs.pop() + if !ok { + return + } + c.processJob(job) } }() @@ -191,7 +205,13 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { } c.chunkRefMapMtx.Unlock() - c.jobs <- job + if ok := c.jobs.push(job); !ok { + c.chunkRefMapMtx.Lock() + delete(c.chunkRefMap, job.ref) + c.chunkRefMapMtx.Unlock() + + return errors.New("queue is closed") + } return nil } @@ -218,7 +238,7 @@ func (c *chunkWriteQueue) stop() { c.isRunning = false - close(c.jobs) + c.jobs.close() c.workerWg.Wait() } @@ -230,7 +250,7 @@ func (c *chunkWriteQueue) queueIsEmpty() bool { func (c *chunkWriteQueue) queueIsFull() bool { // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh // because one job is currently being processed and blocked in the writer. - return c.queueSize() == cap(c.jobs)+1 + return c.queueSize() == c.jobs.maxSize+1 } func (c *chunkWriteQueue) queueSize() int { diff --git a/tsdb/chunks/queue.go b/tsdb/chunks/queue.go new file mode 100644 index 000000000..860381a5f --- /dev/null +++ b/tsdb/chunks/queue.go @@ -0,0 +1,141 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import "sync" + +// writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers +// to avoid using a lot of memory when it's empty. It does that by storing elements into segments +// of equal size (segmentSize). When segment is not used anymore, reference to it are removed, +// so it can be treated as a garbage. +type writeJobQueue struct { + maxSize int + segmentSize int + + mtx sync.Mutex // protects all following variables + pushed, popped *sync.Cond // signalled when something is pushed into the queue or popped from it + first, last *writeJobQueueSegment // pointer to first and last segment, if any + size int // total size of the queue + closed bool // after closing the queue, nothing can be pushed to it +} + +type writeJobQueueSegment struct { + segment []chunkWriteJob + nextRead, nextWrite int // index of next read and next write in this segment. + nextSegment *writeJobQueueSegment // next segment, if any +} + +func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue { + if maxSize <= 0 || segmentSize <= 0 { + panic("invalid queue") + } + + q := &writeJobQueue{ + maxSize: maxSize, + segmentSize: segmentSize, + } + + q.pushed = sync.NewCond(&q.mtx) + q.popped = sync.NewCond(&q.mtx) + return q +} + +func (q *writeJobQueue) close() { + q.mtx.Lock() + defer q.mtx.Unlock() + + q.closed = true + + // Unblock all blocked goroutines. + q.pushed.Broadcast() + q.popped.Broadcast() +} + +// push blocks until there is space available in the queue, and then adds job to the queue. +// If queue is closed or gets closed while waiting for space, push returns false. +func (q *writeJobQueue) push(job chunkWriteJob) bool { + q.mtx.Lock() + defer q.mtx.Unlock() + + // Wait until queue has more space or is closed. + for !q.closed && q.size >= q.maxSize { + q.popped.Wait() + } + + if q.closed { + return false + } + + // Check if this segment has more space for writing, and create new one if not. + if q.last == nil || q.last.nextWrite >= q.segmentSize { + prevLast := q.last + q.last = &writeJobQueueSegment{ + segment: make([]chunkWriteJob, q.segmentSize), + } + + if prevLast != nil { + prevLast.nextSegment = q.last + } + if q.first == nil { + q.first = q.last + } + } + + q.last.segment[q.last.nextWrite] = job + q.last.nextWrite++ + q.size++ + q.pushed.Signal() + return true +} + +// pop returns first job from the queue, and true. +// If queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false). +// If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false. +func (q *writeJobQueue) pop() (chunkWriteJob, bool) { + q.mtx.Lock() + defer q.mtx.Unlock() + + // wait until something is pushed to the queue, or queue is closed. + for q.size == 0 { + if q.closed { + return chunkWriteJob{}, false + } + + q.pushed.Wait() + } + + res := q.first.segment[q.first.nextRead] + q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element + q.first.nextRead++ + q.size-- + + // If we have read all possible elements from first segment, we can drop it. + if q.first.nextRead >= q.segmentSize { + q.first = q.first.nextSegment + if q.first == nil { + q.last = nil + } + } + + q.popped.Signal() + return res, true +} + +// length returns number of all jobs in the queue. +func (q *writeJobQueue) length() int { + q.mtx.Lock() + defer q.mtx.Unlock() + + return q.size +} diff --git a/tsdb/chunks/queue_test.go b/tsdb/chunks/queue_test.go new file mode 100644 index 000000000..e678a040b --- /dev/null +++ b/tsdb/chunks/queue_test.go @@ -0,0 +1,323 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func (q *writeJobQueue) assertInvariants(t *testing.T) { + q.mtx.Lock() + defer q.mtx.Unlock() + + totalSize := 0 + for s := q.first; s != nil; s = s.nextSegment { + require.True(t, s.segment != nil) + + // Next read index is lower or equal than next write index (we cannot past written jobs) + require.True(t, s.nextRead <= s.nextWrite) + + // Number of unread elements in this segment. + totalSize += s.nextWrite - s.nextRead + + // First segment can be partially read, other segments were not read yet. + if s == q.first { + require.True(t, s.nextRead >= 0) + } else { + require.True(t, s.nextRead == 0) + } + + // If first shard is empty (everything was read from it already), it must have extra capacity for + // additional elements, otherwise it would have been removed. + if s == q.first && s.nextRead == s.nextWrite { + require.True(t, s.nextWrite < len(s.segment)) + } + + // Segments in the middle are full. + if s != q.first && s != q.last { + require.True(t, s.nextWrite == len(s.segment)) + } + // Last segment must have at least one element, or we wouldn't have created it. + require.True(t, s.nextWrite > 0) + } + + require.Equal(t, q.size, totalSize) +} + +func TestQueuePushPopSingleGoroutine(t *testing.T) { + seed := time.Now().UnixNano() + t.Log("seed:", seed) + r := rand.New(rand.NewSource(seed)) + + const maxSize = 500 + const maxIters = 50 + + for max := 1; max < maxSize; max++ { + queue := newWriteJobQueue(max, 1+(r.Int()%max)) + + elements := 0 // total elements in the queue + lastWriteID := 0 + lastReadID := 0 + + for iter := 0; iter < maxIters; iter++ { + if elements < max { + toWrite := r.Int() % (max - elements) + if toWrite == 0 { + toWrite = 1 + } + + for i := 0; i < toWrite; i++ { + lastWriteID++ + require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(lastWriteID)})) + + elements++ + } + } + + if elements > 0 { + toRead := r.Int() % elements + if toRead == 0 { + toRead = 1 + } + + for i := 0; i < toRead; i++ { + lastReadID++ + + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(lastReadID), j.seriesRef) + + elements-- + } + } + + require.Equal(t, elements, queue.length()) + queue.assertInvariants(t) + } + } +} + +func TestQueuePushBlocksOnFullQueue(t *testing.T) { + queue := newWriteJobQueue(5, 5) + + pushTime := make(chan time.Time) + go func() { + require.True(t, queue.push(chunkWriteJob{seriesRef: 1})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 2})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 3})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 4})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 5})) + pushTime <- time.Now() + // This will block + require.True(t, queue.push(chunkWriteJob{seriesRef: 6})) + pushTime <- time.Now() + }() + + timeBeforePush := <-pushTime + + delay := 100 * time.Millisecond + select { + case <-time.After(delay): + // ok + case <-pushTime: + require.Fail(t, "didn't expect another push to proceed") + } + + popTime := time.Now() + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(1), j.seriesRef) + + timeAfterPush := <-pushTime + + require.GreaterOrEqual(t, timeAfterPush.Sub(popTime), time.Duration(0)) + require.GreaterOrEqual(t, timeAfterPush.Sub(timeBeforePush), delay) +} + +func TestQueuePopBlocksOnEmptyQueue(t *testing.T) { + queue := newWriteJobQueue(5, 5) + + popTime := make(chan time.Time) + go func() { + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(1), j.seriesRef) + + popTime <- time.Now() + + // This will block + j, b = queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(2), j.seriesRef) + + popTime <- time.Now() + }() + + queue.push(chunkWriteJob{seriesRef: 1}) + + timeBeforePop := <-popTime + + delay := 100 * time.Millisecond + select { + case <-time.After(delay): + // ok + case <-popTime: + require.Fail(t, "didn't expect another pop to proceed") + } + + pushTime := time.Now() + require.True(t, queue.push(chunkWriteJob{seriesRef: 2})) + + timeAfterPop := <-popTime + + require.GreaterOrEqual(t, timeAfterPop.Sub(pushTime), time.Duration(0)) + require.Greater(t, timeAfterPop.Sub(timeBeforePop), delay) +} + +func TestQueuePopUnblocksOnClose(t *testing.T) { + queue := newWriteJobQueue(5, 5) + + popTime := make(chan time.Time) + go func() { + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(1), j.seriesRef) + + popTime <- time.Now() + + // This will block until queue is closed. + j, b = queue.pop() + require.False(t, b) + + popTime <- time.Now() + }() + + queue.push(chunkWriteJob{seriesRef: 1}) + + timeBeforePop := <-popTime + + delay := 100 * time.Millisecond + select { + case <-time.After(delay): + // ok + case <-popTime: + require.Fail(t, "didn't expect another pop to proceed") + } + + closeTime := time.Now() + queue.close() + + timeAfterPop := <-popTime + + require.GreaterOrEqual(t, timeAfterPop.Sub(closeTime), time.Duration(0)) + require.GreaterOrEqual(t, timeAfterPop.Sub(timeBeforePop), delay) +} + +func TestQueuePopAfterCloseReturnsAllElements(t *testing.T) { + const count = 10 + + queue := newWriteJobQueue(count, count) + + for i := 0; i < count; i++ { + require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(i)})) + } + + // close the queue before popping all elements. + queue.close() + + // No more pushing allowed after close. + require.False(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(11111)})) + + // Verify that we can still read all pushed elements. + for i := 0; i < count; i++ { + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(i), j.seriesRef) + } + + _, b := queue.pop() + require.False(t, b) +} + +func TestQueuePushPopManyGoroutines(t *testing.T) { + const readGoroutines = 5 + const writeGoroutines = 10 + const writes = 500 + + queue := newWriteJobQueue(1024, 64) + + // Reading goroutine + refsMx := sync.Mutex{} + refs := map[HeadSeriesRef]bool{} + + readersWG := sync.WaitGroup{} + for i := 0; i < readGoroutines; i++ { + readersWG.Add(1) + + go func() { + defer readersWG.Done() + + for j, ok := queue.pop(); ok; j, ok = queue.pop() { + refsMx.Lock() + refs[j.seriesRef] = true + refsMx.Unlock() + } + }() + } + + id := atomic.Uint64{} + + writersWG := sync.WaitGroup{} + for i := 0; i < writeGoroutines; i++ { + writersWG.Add(1) + + go func() { + defer writersWG.Done() + + for i := 0; i < writes; i++ { + ref := id.Inc() + + require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(ref)})) + } + }() + } + + // Wait until all writes are done. + writersWG.Wait() + + // Close the queue and wait for reading to be done. + queue.close() + readersWG.Wait() + + // Check if we have all expected values + require.Equal(t, writeGoroutines*writes, len(refs)) +} + +func TestQueueSegmentIsKeptEvenIfEmpty(t *testing.T) { + queue := newWriteJobQueue(1024, 64) + + require.True(t, queue.push(chunkWriteJob{seriesRef: 1})) + _, b := queue.pop() + require.True(t, b) + + require.NotNil(t, queue.first) + require.Equal(t, 1, queue.first.nextRead) + require.Equal(t, 1, queue.first.nextWrite) +}