prometheus/tsdb/chunks/queue.go

128 lines
3.4 KiB
Go
Raw Normal View History

package chunks
import "sync"
// writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers
// to avoid using a lot of memory when it's empty. It does that by storing elements into segments
// of equal size (segmentSize). When segment is not used anymore, reference to it are removed,
// so it can be treated as a garbage.
type writeJobQueue struct {
maxSize int
segmentSize int
mtx sync.Mutex // protects all following variables
pushed, popped *sync.Cond // signalled when something is pushed into the queue or popped from it
first, last *writeJobQueueSegment // pointer to first and last segment, if any
size int // total size of the queue
closed bool // after closing the queue, nothing can be pushed to it
}
type writeJobQueueSegment struct {
segment []chunkWriteJob
nextRead, nextWrite int // index of next read and next write in this segment.
nextSegment *writeJobQueueSegment // next segment, if any
}
func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue {
if maxSize <= 0 || segmentSize <= 0 {
panic("invalid queue")
}
q := &writeJobQueue{
maxSize: maxSize,
segmentSize: segmentSize,
}
q.pushed = sync.NewCond(&q.mtx)
q.popped = sync.NewCond(&q.mtx)
return q
}
func (q *writeJobQueue) close() {
q.mtx.Lock()
defer q.mtx.Unlock()
q.closed = true
// unblock all blocked goroutines
q.pushed.Broadcast()
q.popped.Broadcast()
}
// push blocks until there is space available in the queue, and then adds job to the queue.
// If queue is closed or gets closed while waiting for space, push returns false.
func (q *writeJobQueue) push(job chunkWriteJob) bool {
q.mtx.Lock()
defer q.mtx.Unlock()
// wait until queue has more space or is closed
for !q.closed && q.size >= q.maxSize {
q.popped.Wait()
}
if q.closed {
return false
}
// Check if this segment has more space for writing, and create new one if not.
if q.last == nil || q.last.nextWrite >= q.segmentSize {
prevLast := q.last
q.last = &writeJobQueueSegment{
segment: make([]chunkWriteJob, q.segmentSize),
}
if prevLast != nil {
prevLast.nextSegment = q.last
}
if q.first == nil {
q.first = q.last
}
}
q.last.segment[q.last.nextWrite] = job
q.last.nextWrite++
q.size++
q.pushed.Signal()
return true
}
// pop returns first job from the queue, and true.
// if queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false).
// If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false.
func (q *writeJobQueue) pop() (chunkWriteJob, bool) {
q.mtx.Lock()
defer q.mtx.Unlock()
// wait until something is pushed to the queue, or queue is closed.
for q.size == 0 {
if q.closed {
return chunkWriteJob{}, false
}
q.pushed.Wait()
}
res := q.first.segment[q.first.nextRead]
q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element
q.first.nextRead++
q.size--
// If we have read all possible elements from first segment, we can drop it.
if q.first.nextRead >= q.segmentSize {
q.first = q.first.nextSegment
if q.first == nil {
q.last = nil
}
}
q.popped.Signal()
return res, true
}
func (q *writeJobQueue) length() int {
q.mtx.Lock()
defer q.mtx.Unlock()
return q.size
}