Merge pull request #232 from prometheus/optimize/granular-storage-locking

Synchronous memory appends and more fine-grained storage locks.
This commit is contained in:
juliusv 2013-05-13 10:11:57 -07:00
commit 92ad65ff13
5 changed files with 46 additions and 75 deletions

View file

@ -160,7 +160,7 @@ func main() {
log.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
ts, err := metric.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
if err != nil {
log.Fatalf("Error opening storage: %s", err)
}

View file

@ -33,7 +33,7 @@ type MetricPersistence interface {
// Record a new sample in the storage layer.
AppendSample(model.Sample) error
// Record a new sample in the storage layer.
// Record a group of new samples in the storage layer.
AppendSamples(model.Samples) error
// Get all of the metric fingerprints that are associated with the provided

View file

@ -26,10 +26,6 @@ var (
testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC)
)
const (
appendQueueSize = 100
)
func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) {
err := p.AppendSample(s)
if err != nil {
@ -90,7 +86,7 @@ func (t testTieredStorageCloser) Close() {
func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) {
var directory test.TemporaryDirectory
directory = test.NewTemporaryDirectory("test_tiered_storage", t)
storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path())
if err != nil {
if storage != nil {

View file

@ -60,13 +60,17 @@ type TieredStorage struct {
DiskStorage *LevelDBMetricPersistence
appendToDiskQueue chan model.Samples
appendToMemoryQueue chan model.Samples
diskFrontier *diskFrontier
draining chan chan bool
flushMemoryInterval time.Duration
memoryArena memorySeriesStorage
memoryTTL time.Duration
mutex sync.Mutex
// This mutex manages any concurrent reads/writes of the memoryArena.
memoryMutex sync.RWMutex
// This mutex blocks only deletions from the memoryArena. It is held for a
// potentially long time for an entire renderView() duration, since we depend
// on no samples being removed from memory after grabbing a LevelDB snapshot.
memoryDeleteMutex sync.RWMutex
viewQueue chan viewJob
writeMemoryInterval time.Duration
}
@ -79,7 +83,7 @@ type viewJob struct {
err chan error
}
func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) {
diskStorage, err := NewLevelDBMetricPersistence(root)
if err != nil {
return
@ -87,7 +91,6 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
storage = &TieredStorage{
appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth),
appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth),
DiskStorage: diskStorage,
draining: make(chan chan bool),
flushMemoryInterval: flushMemoryInterval,
@ -100,12 +103,14 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
}
// Enqueues Samples for storage.
func (t *TieredStorage) AppendSamples(s model.Samples) (err error) {
func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) {
if len(t.draining) > 0 {
return fmt.Errorf("Storage is in the process of draining.")
}
t.appendToMemoryQueue <- s
t.memoryMutex.Lock()
t.memoryArena.AppendSamples(samples)
t.memoryMutex.Unlock()
return
}
@ -175,8 +180,6 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) {
func (t *TieredStorage) Serve() {
flushMemoryTicker := time.NewTicker(t.flushMemoryInterval)
defer flushMemoryTicker.Stop()
writeMemoryTicker := time.NewTicker(t.writeMemoryInterval)
defer writeMemoryTicker.Stop()
reportTicker := time.NewTicker(time.Second)
defer reportTicker.Stop()
@ -188,14 +191,12 @@ func (t *TieredStorage) Serve() {
for {
select {
case <-writeMemoryTicker.C:
t.writeMemory()
case <-flushMemoryTicker.C:
t.flushMemory()
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
case drainingDone := <-t.draining:
t.flush()
t.Flush()
drainingDone <- true
return
}
@ -206,33 +207,12 @@ func (t *TieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "occupancy"}, float64(len(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "capacity"}, float64(cap(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue)))
}
func (t *TieredStorage) writeMemory() {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
pendingLength := len(t.appendToMemoryQueue)
for i := 0; i < pendingLength; i++ {
t.memoryArena.AppendSamples(<-t.appendToMemoryQueue)
}
}
func (t *TieredStorage) Flush() {
t.flush()
t.flushMemory()
}
func (t *TieredStorage) Close() {
@ -242,31 +222,23 @@ func (t *TieredStorage) Close() {
t.memoryArena.Close()
close(t.appendToDiskQueue)
close(t.appendToMemoryQueue)
close(t.viewQueue)
log.Println("Done.")
}
// Write all pending appends.
func (t *TieredStorage) flush() (err error) {
// Trim any old values to reduce iterative write costs.
t.flushMemory()
t.writeMemory()
t.flushMemory()
return
}
type memoryToDiskFlusher struct {
toDiskQueue chan model.Samples
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
toDiskQueue chan model.Samples
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
memoryDeleteMutex *sync.RWMutex
}
type memoryToDiskFlusherVisitor struct {
stream stream
flusher *memoryToDiskFlusher
stream stream
flusher *memoryToDiskFlusher
memoryDeleteMutex *sync.RWMutex
}
func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) {
@ -311,15 +283,18 @@ func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storag
},
}
f.memoryDeleteMutex.Lock()
f.stream.values.Delete(skipListTime(recordTime))
f.memoryDeleteMutex.Unlock()
return
}
func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
visitor := memoryToDiskFlusherVisitor{
stream: stream,
flusher: f,
stream: stream,
flusher: f,
memoryDeleteMutex: f.memoryDeleteMutex,
}
return visitor, visitor, visitor
@ -338,7 +313,7 @@ func (f memoryToDiskFlusher) Close() {
f.Flush()
}
// Persist a whole bunch of samples to the datastore.
// Persist a whole bunch of samples from memory to the datastore.
func (t *TieredStorage) flushMemory() {
begin := time.Now()
defer func() {
@ -347,13 +322,14 @@ func (t *TieredStorage) flushMemory() {
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
t.memoryMutex.RLock()
defer t.memoryMutex.RUnlock()
flusher := &memoryToDiskFlusher{
disk: t.DiskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
disk: t.DiskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
memoryDeleteMutex: &t.memoryDeleteMutex,
}
defer flusher.Close()
@ -372,15 +348,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure})
}()
t.mutex.Lock()
defer t.mutex.Unlock()
// No samples may be deleted from memory while rendering a view.
t.memoryDeleteMutex.RLock()
defer t.memoryDeleteMutex.RUnlock()
var (
scans = viewJob.builder.ScanJobs()
view = newView()
// Get a single iterator that will be used for all data extraction below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
)
scans := viewJob.builder.ScanJobs()
view := newView()
// Get a single iterator that will be used for all data extraction below.
iterator := t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
// Rebuilding of the frontier should happen on a conditional basis if a
@ -410,7 +385,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
targetTime := *standingOps[0].CurrentTime()
currentChunk := chunk{}
t.memoryMutex.RLock()
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
t.memoryMutex.RUnlock()
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
// XXX: For earnest performance gains analagous to the benchmarking we

View file

@ -347,8 +347,6 @@ func testMakeView(t test.Tester, flushToDisk bool) {
if flushToDisk {
tiered.Flush()
} else {
tiered.writeMemory()
}
requestBuilder := NewViewRequestBuilder()