Merge pull request #279 from prometheus/refactor/storage-semaphore

Cleanups in tiered storage and apply semaphores.
This commit is contained in:
Matt T. Proud 2013-06-06 09:16:53 -07:00
commit 6b2be5024e

View file

@ -17,7 +17,6 @@ import (
"fmt"
"log"
"sort"
"sync"
"time"
dto "github.com/prometheus/prometheus/model/generated"
@ -27,6 +26,7 @@ import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"sync"
)
type chunk model.Values
@ -65,6 +65,7 @@ const (
// TieredStorage both persists samples and generates materialized views for
// queries.
type TieredStorage struct {
// mu is purely used for state transitions.
mu sync.RWMutex
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
@ -81,6 +82,9 @@ type TieredStorage struct {
draining chan chan<- bool
state tieredStorageState
memorySemaphore chan bool
diskSemaphore chan bool
}
// viewJob encapsulates a request to extract sample values from the datastore.
@ -92,13 +96,18 @@ type viewJob struct {
stats *stats.TimerGroup
}
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
const (
tieredDiskSemaphores = 1
tieredMemorySemaphores = 5
)
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (*TieredStorage, error) {
diskStorage, err := NewLevelDBMetricPersistence(root)
if err != nil {
return
return nil, err
}
storage = &TieredStorage{
s := &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan<- bool),
@ -106,8 +115,19 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth),
diskSemaphore: make(chan bool, tieredDiskSemaphores),
memorySemaphore: make(chan bool, tieredMemorySemaphores),
}
return
for i := 0; i < tieredDiskSemaphores; i++ {
s.diskSemaphore <- true
}
for i := 0; i < tieredMemorySemaphores; i++ {
s.memorySemaphore <- true
}
return s, nil
}
// Enqueues Samples for storage.
@ -206,7 +226,8 @@ func (t *TieredStorage) Serve(started chan<- bool) {
t.flushMemory(t.memoryTTL)
case viewRequest := <-t.viewQueue:
viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop()
t.renderView(viewRequest)
<-t.memorySemaphore
go t.renderView(viewRequest)
case drainingDone := <-t.draining:
t.Flush()
drainingDone <- true
@ -303,6 +324,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
var err error
begin := time.Now()
defer func() {
t.memorySemaphore <- true
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
@ -341,6 +364,11 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Conditionalize disk access.
if diskFrontier == nil && diskPresent {
if iterator == nil {
<-t.diskSemaphore
defer func() {
t.diskSemaphore <- true
}()
// Get a single iterator that will be used for all data extraction
// below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)