diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index b635da5bc8..071c92c85d 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -29,6 +29,10 @@ const ( // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. chunkRefMapMinShrinkInterval = 10 * time.Minute + + // Maximum size of segment used by job queue (number of elements). With chunkWriteJob being 64 bytes, + // this will use ~512 KiB for empty queue. + maxChunkQueueSegmentSize = 8192 ) type chunkWriteJob struct { @@ -45,7 +49,7 @@ type chunkWriteJob struct { // Chunks that shall be written get added to the queue, which is consumed asynchronously. // Adding jobs to the queue is non-blocking as long as the queue isn't full. type chunkWriteQueue struct { - jobs chan chunkWriteJob + jobs *writeJobQueue chunkRefMapMtx sync.RWMutex chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk @@ -83,8 +87,13 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu []string{"operation"}, ) + segmentSize := size + if segmentSize > maxChunkQueueSegmentSize { + segmentSize = maxChunkQueueSegmentSize + } + q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), + jobs: newWriteJobQueue(size, segmentSize), chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), chunkRefMapLastShrink: time.Now(), writeChunk: writeChunk, @@ -108,7 +117,12 @@ func (c *chunkWriteQueue) start() { go func() { defer c.workerWg.Done() - for job := range c.jobs { + for { + job, ok := c.jobs.pop() + if !ok { + return + } + c.processJob(job) } }() @@ -191,7 +205,14 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { } c.chunkRefMapMtx.Unlock() - c.jobs <- job + ok := c.jobs.push(job) + if !ok { + c.chunkRefMapMtx.Lock() + delete(c.chunkRefMap, job.ref) + c.chunkRefMapMtx.Unlock() + + return errors.New("queue is closed") + } return nil } @@ -218,7 +239,7 @@ func (c *chunkWriteQueue) stop() { c.isRunning = false - close(c.jobs) + c.jobs.close() c.workerWg.Wait() } @@ -230,7 +251,7 @@ func (c *chunkWriteQueue) queueIsEmpty() bool { func (c *chunkWriteQueue) queueIsFull() bool { // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh // because one job is currently being processed and blocked in the writer. - return c.queueSize() == cap(c.jobs)+1 + return c.queueSize() == c.jobs.maxSize+1 } func (c *chunkWriteQueue) queueSize() int { diff --git a/tsdb/chunks/queue.go b/tsdb/chunks/queue.go new file mode 100644 index 0000000000..23b38e7f27 --- /dev/null +++ b/tsdb/chunks/queue.go @@ -0,0 +1,127 @@ +package chunks + +import "sync" + +// writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers +// to avoid using a lot of memory when it's empty. It does that by storing elements into segments +// of equal size (segmentSize). When segment is not used anymore, reference to it are removed, +// so it can be treated as a garbage. +type writeJobQueue struct { + maxSize int + segmentSize int + + mtx sync.Mutex // protects all following variables + pushed, popped *sync.Cond // signalled when something is pushed into the queue or popped from it + first, last *writeJobQueueSegment // pointer to first and last segment, if any + size int // total size of the queue + closed bool // after closing the queue, nothing can be pushed to it +} + +type writeJobQueueSegment struct { + segment []chunkWriteJob + nextRead, nextWrite int // index of next read and next write in this segment. + nextSegment *writeJobQueueSegment // next segment, if any +} + +func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue { + if maxSize <= 0 || segmentSize <= 0 { + panic("invalid queue") + } + + q := &writeJobQueue{ + maxSize: maxSize, + segmentSize: segmentSize, + } + + q.pushed = sync.NewCond(&q.mtx) + q.popped = sync.NewCond(&q.mtx) + return q +} + +func (q *writeJobQueue) close() { + q.mtx.Lock() + defer q.mtx.Unlock() + + q.closed = true + + // unblock all blocked goroutines + q.pushed.Broadcast() + q.popped.Broadcast() +} + +// push blocks until there is space available in the queue, and then adds job to the queue. +// If queue is closed or gets closed while waiting for space, push returns false. +func (q *writeJobQueue) push(job chunkWriteJob) bool { + q.mtx.Lock() + defer q.mtx.Unlock() + + // wait until queue has more space or is closed + for !q.closed && q.size >= q.maxSize { + q.popped.Wait() + } + + if q.closed { + return false + } + + // Check if this segment has more space for writing, and create new one if not. + if q.last == nil || q.last.nextWrite >= q.segmentSize { + prevLast := q.last + q.last = &writeJobQueueSegment{ + segment: make([]chunkWriteJob, q.segmentSize), + } + + if prevLast != nil { + prevLast.nextSegment = q.last + } + if q.first == nil { + q.first = q.last + } + } + + q.last.segment[q.last.nextWrite] = job + q.last.nextWrite++ + q.size++ + q.pushed.Signal() + return true +} + +// pop returns first job from the queue, and true. +// if queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false). +// If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false. +func (q *writeJobQueue) pop() (chunkWriteJob, bool) { + q.mtx.Lock() + defer q.mtx.Unlock() + + // wait until something is pushed to the queue, or queue is closed. + for q.size == 0 { + if q.closed { + return chunkWriteJob{}, false + } + + q.pushed.Wait() + } + + res := q.first.segment[q.first.nextRead] + q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element + q.first.nextRead++ + q.size-- + + // If we have read all possible elements from first segment, we can drop it. + if q.first.nextRead >= q.segmentSize { + q.first = q.first.nextSegment + if q.first == nil { + q.last = nil + } + } + + q.popped.Signal() + return res, true +} + +func (q *writeJobQueue) length() int { + q.mtx.Lock() + defer q.mtx.Unlock() + + return q.size +} diff --git a/tsdb/chunks/queue_test.go b/tsdb/chunks/queue_test.go new file mode 100644 index 0000000000..008ef6d50b --- /dev/null +++ b/tsdb/chunks/queue_test.go @@ -0,0 +1,310 @@ +package chunks + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func (q *writeJobQueue) assertInvariants(t *testing.T) { + q.mtx.Lock() + defer q.mtx.Unlock() + + totalSize := 0 + for s := q.first; s != nil; s = s.nextSegment { + require.True(t, s.segment != nil) + + // Next read index is lower or equal than next write index (we cannot past written jobs) + require.True(t, s.nextRead <= s.nextWrite) + + // Number of unread elements in this segment. + totalSize += s.nextWrite - s.nextRead + + // First segment can be partially read, other segments were not read yet. + if s == q.first { + require.True(t, s.nextRead >= 0) + } else { + require.True(t, s.nextRead == 0) + } + + // If first shard is empty (everything was read from it already), it must have extra capacity for + // additional elements, otherwise it would have been removed. + if s == q.first && s.nextRead == s.nextWrite { + require.True(t, s.nextWrite < len(s.segment)) + } + + // Segments in the middle are full. + if s != q.first && s != q.last { + require.True(t, s.nextWrite == len(s.segment)) + } + // Last segment must have at least one element, or we wouldn't have created it. + require.True(t, s.nextWrite > 0) + } + + require.Equal(t, q.size, totalSize) +} + +func TestQueuePushPopSingleGoroutine(t *testing.T) { + seed := time.Now().UnixNano() + t.Log("seed:", seed) + r := rand.New(rand.NewSource(seed)) + + const maxSize = 500 + const maxIters = 50 + + for max := 1; max < maxSize; max++ { + queue := newWriteJobQueue(max, 1+(r.Int()%max)) + + elements := 0 // total elements in the queue + lastWriteID := 0 + lastReadID := 0 + + for iter := 0; iter < maxIters; iter++ { + if elements < max { + toWrite := r.Int() % (max - elements) + if toWrite == 0 { + toWrite = 1 + } + + for i := 0; i < toWrite; i++ { + lastWriteID++ + require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(lastWriteID)})) + + elements++ + } + } + + if elements > 0 { + toRead := r.Int() % elements + if toRead == 0 { + toRead = 1 + } + + for i := 0; i < toRead; i++ { + lastReadID++ + + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(lastReadID), j.seriesRef) + + elements-- + } + } + + require.Equal(t, elements, queue.length()) + queue.assertInvariants(t) + } + } +} + +func TestQueuePushBlocksOnFullQueue(t *testing.T) { + queue := newWriteJobQueue(5, 5) + + pushTime := make(chan time.Time) + go func() { + require.True(t, queue.push(chunkWriteJob{seriesRef: 1})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 2})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 3})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 4})) + require.True(t, queue.push(chunkWriteJob{seriesRef: 5})) + // This will block + pushTime <- time.Now() + require.True(t, queue.push(chunkWriteJob{seriesRef: 6})) + pushTime <- time.Now() + }() + + before := <-pushTime + + delay := 100 * time.Millisecond + select { + case <-time.After(delay): + // ok + case <-pushTime: + require.Fail(t, "didn't expect another push to proceed") + } + + popTime := time.Now() + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(1), j.seriesRef) + + after := <-pushTime + + require.True(t, after.After(popTime)) + require.True(t, after.Sub(before) > delay) +} + +func TestQueuePopBlocksOnEmptyQueue(t *testing.T) { + queue := newWriteJobQueue(5, 5) + + popTime := make(chan time.Time) + go func() { + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(1), j.seriesRef) + + popTime <- time.Now() + + // This will block + j, b = queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(2), j.seriesRef) + + popTime <- time.Now() + }() + + queue.push(chunkWriteJob{seriesRef: 1}) + + before := <-popTime + + delay := 100 * time.Millisecond + select { + case <-time.After(delay): + // ok + case <-popTime: + require.Fail(t, "didn't expect another pop to proceed") + } + + pushTime := time.Now() + require.True(t, queue.push(chunkWriteJob{seriesRef: 2})) + + after := <-popTime + + require.True(t, after.After(pushTime)) + require.True(t, after.Sub(before) > delay) +} + +func TestQueuePopUnblocksOnClose(t *testing.T) { + queue := newWriteJobQueue(5, 5) + + popTime := make(chan time.Time) + go func() { + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(1), j.seriesRef) + + popTime <- time.Now() + + // This will block until queue is closed. + j, b = queue.pop() + require.False(t, b) + + popTime <- time.Now() + }() + + queue.push(chunkWriteJob{seriesRef: 1}) + + before := <-popTime + + delay := 100 * time.Millisecond + select { + case <-time.After(delay): + // ok + case <-popTime: + require.Fail(t, "didn't expect another pop to proceed") + } + + closeTime := time.Now() + queue.close() + + after := <-popTime + + require.True(t, after.After(closeTime)) + require.True(t, after.Sub(before) > delay) +} + +func TestQueuePopAfterCloseReturnsAllElements(t *testing.T) { + const count = 10 + + queue := newWriteJobQueue(count, count) + + for i := 0; i < count; i++ { + require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(i)})) + } + + // close the queue before popping all elements. + queue.close() + + // No more pushing allowed after close. + require.False(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(11111)})) + + // Verify that we can still read all pushed elements. + for i := 0; i < count; i++ { + j, b := queue.pop() + require.True(t, b) + require.Equal(t, HeadSeriesRef(i), j.seriesRef) + } + + _, b := queue.pop() + require.False(t, b) +} + +func TestQueuePushPopManyGoroutines(t *testing.T) { + const readGoroutines = 5 + const writeGoroutines = 10 + const writes = 500 + + queue := newWriteJobQueue(1024, 64) + + // Reading goroutine + refsMx := sync.Mutex{} + refs := map[HeadSeriesRef]bool{} + + readersWG := sync.WaitGroup{} + for i := 0; i < readGoroutines; i++ { + readersWG.Add(1) + + go func() { + defer readersWG.Done() + + for j, ok := queue.pop(); ok; j, ok = queue.pop() { + refsMx.Lock() + refs[j.seriesRef] = true + refsMx.Unlock() + } + }() + } + + id := atomic.Uint64{} + + writersWG := sync.WaitGroup{} + for i := 0; i < writeGoroutines; i++ { + writersWG.Add(1) + + go func() { + defer writersWG.Done() + + for i := 0; i < writes; i++ { + ref := id.Inc() + + require.True(t, queue.push(chunkWriteJob{seriesRef: HeadSeriesRef(ref)})) + } + }() + } + + // Wait until all writes are done. + writersWG.Wait() + + // Close the queue and wait for reading to be done. + queue.close() + readersWG.Wait() + + // Check if we have all expected values + require.Equal(t, writeGoroutines*writes, len(refs)) +} + +func TestQueueSegmentIsKeptEvenIfEmpty(t *testing.T) { + queue := newWriteJobQueue(1024, 64) + + require.True(t, queue.push(chunkWriteJob{seriesRef: 1})) + _, b := queue.pop() + require.True(t, b) + + require.NotNil(t, queue.first) + require.Equal(t, 1, queue.first.nextRead) + require.Equal(t, 1, queue.first.nextWrite) +}