Introduce semaphores for tiered storage.

This commit wraps the tiered storage access componnets in semaphores,
since we can handle several concurrent memory reads.
This commit is contained in:
Matt T. Proud 2013-06-05 10:40:39 +02:00
parent 0d46f6b42a
commit ef1d5fd8a2

View file

@ -17,7 +17,6 @@ import (
"fmt" "fmt"
"log" "log"
"sort" "sort"
"sync"
"time" "time"
dto "github.com/prometheus/prometheus/model/generated" dto "github.com/prometheus/prometheus/model/generated"
@ -27,6 +26,7 @@ import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/stats" "github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/raw/leveldb" "github.com/prometheus/prometheus/storage/raw/leveldb"
"sync"
) )
type chunk model.Values type chunk model.Values
@ -65,6 +65,7 @@ const (
// TieredStorage both persists samples and generates materialized views for // TieredStorage both persists samples and generates materialized views for
// queries. // queries.
type TieredStorage struct { type TieredStorage struct {
// mu is purely used for state transitions.
mu sync.RWMutex mu sync.RWMutex
// BUG(matt): This introduces a Law of Demeter violation. Ugh. // BUG(matt): This introduces a Law of Demeter violation. Ugh.
@ -81,6 +82,9 @@ type TieredStorage struct {
draining chan chan<- bool draining chan chan<- bool
state tieredStorageState state tieredStorageState
memorySemaphore chan bool
diskSemaphore chan bool
} }
// viewJob encapsulates a request to extract sample values from the datastore. // viewJob encapsulates a request to extract sample values from the datastore.
@ -92,13 +96,18 @@ type viewJob struct {
stats *stats.TimerGroup stats *stats.TimerGroup
} }
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { const (
tieredDiskSemaphores = 1
tieredMemorySemaphores = 5
)
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, memoryTTL time.Duration, root string) (*TieredStorage, error) {
diskStorage, err := NewLevelDBMetricPersistence(root) diskStorage, err := NewLevelDBMetricPersistence(root)
if err != nil { if err != nil {
return return nil, err
} }
storage = &TieredStorage{ s := &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
DiskStorage: diskStorage, DiskStorage: diskStorage,
draining: make(chan chan<- bool), draining: make(chan chan<- bool),
@ -106,8 +115,19 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
memoryArena: NewMemorySeriesStorage(), memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL, memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth), viewQueue: make(chan viewJob, viewQueueDepth),
diskSemaphore: make(chan bool, tieredDiskSemaphores),
memorySemaphore: make(chan bool, tieredMemorySemaphores),
} }
return
for i := 0; i < tieredDiskSemaphores; i++ {
s.diskSemaphore <- true
}
for i := 0; i < tieredMemorySemaphores; i++ {
s.memorySemaphore <- true
}
return s, nil
} }
// Enqueues Samples for storage. // Enqueues Samples for storage.
@ -206,7 +226,8 @@ func (t *TieredStorage) Serve(started chan<- bool) {
t.flushMemory(t.memoryTTL) t.flushMemory(t.memoryTTL)
case viewRequest := <-t.viewQueue: case viewRequest := <-t.viewQueue:
viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop() viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop()
t.renderView(viewRequest) <-t.memorySemaphore
go t.renderView(viewRequest)
case drainingDone := <-t.draining: case drainingDone := <-t.draining:
t.Flush() t.Flush()
drainingDone <- true drainingDone <- true
@ -303,6 +324,8 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
var err error var err error
begin := time.Now() begin := time.Now()
defer func() { defer func() {
t.memorySemaphore <- true
duration := time.Since(begin) duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
@ -341,6 +364,11 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Conditionalize disk access. // Conditionalize disk access.
if diskFrontier == nil && diskPresent { if diskFrontier == nil && diskPresent {
if iterator == nil { if iterator == nil {
<-t.diskSemaphore
defer func() {
t.diskSemaphore <- true
}()
// Get a single iterator that will be used for all data extraction // Get a single iterator that will be used for all data extraction
// below. // below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true) iterator = t.DiskStorage.MetricSamples.NewIterator(true)