diff --git a/main.go b/main.go index 3f2d522c21..679de45b3e 100644 --- a/main.go +++ b/main.go @@ -133,7 +133,7 @@ func main() { log.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) + ts, err := metric.NewTieredStorage(uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath) if err != nil { log.Fatalf("Error opening storage: %s", err) } diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 1e49a5407b..a336da50b1 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -33,7 +33,7 @@ type MetricPersistence interface { // Record a new sample in the storage layer. AppendSample(model.Sample) error - // Record a new sample in the storage layer. + // Record a group of new samples in the storage layer. AppendSamples(model.Samples) error // Get all of the metric fingerprints that are associated with the provided diff --git a/storage/metric/test_helper.go b/storage/metric/test_helper.go index 4ca0621951..f272e55a0b 100644 --- a/storage/metric/test_helper.go +++ b/storage/metric/test_helper.go @@ -26,10 +26,6 @@ var ( testInstant = time.Date(1972, 7, 18, 19, 5, 45, 0, usEastern).In(time.UTC) ) -const ( - appendQueueSize = 100 -) - func testAppendSample(p MetricPersistence, s model.Sample, t test.Tester) { err := p.AppendSample(s) if err != nil { @@ -90,7 +86,7 @@ func (t testTieredStorageCloser) Close() { func NewTestTieredStorage(t test.Tester) (storage *TieredStorage, closer test.Closer) { var directory test.TemporaryDirectory directory = test.NewTemporaryDirectory("test_tiered_storage", t) - storage, err := NewTieredStorage(appendQueueSize, 2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) + storage, err := NewTieredStorage(2500, 1000, 5*time.Second, 15*time.Second, 0*time.Second, directory.Path()) if err != nil { if storage != nil { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 5e848e9b7c..bca86f983d 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -60,13 +60,17 @@ type TieredStorage struct { DiskStorage *LevelDBMetricPersistence appendToDiskQueue chan model.Samples - appendToMemoryQueue chan model.Samples diskFrontier *diskFrontier draining chan chan bool flushMemoryInterval time.Duration memoryArena memorySeriesStorage memoryTTL time.Duration - mutex sync.Mutex + // This mutex manages any concurrent reads/writes of the memoryArena. + memoryMutex sync.RWMutex + // This mutex blocks only deletions from the memoryArena. It is held for a + // potentially long time for an entire renderView() duration, since we depend + // on no samples being removed from memory after grabbing a LevelDB snapshot. + memoryDeleteMutex sync.RWMutex viewQueue chan viewJob writeMemoryInterval time.Duration } @@ -79,7 +83,7 @@ type viewJob struct { err chan error } -func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { +func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) (storage *TieredStorage, err error) { diskStorage, err := NewLevelDBMetricPersistence(root) if err != nil { return @@ -87,7 +91,6 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu storage = &TieredStorage{ appendToDiskQueue: make(chan model.Samples, appendToDiskQueueDepth), - appendToMemoryQueue: make(chan model.Samples, appendToMemoryQueueDepth), DiskStorage: diskStorage, draining: make(chan chan bool), flushMemoryInterval: flushMemoryInterval, @@ -100,12 +103,14 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu } // Enqueues Samples for storage. -func (t *TieredStorage) AppendSamples(s model.Samples) (err error) { +func (t *TieredStorage) AppendSamples(samples model.Samples) (err error) { if len(t.draining) > 0 { return fmt.Errorf("Storage is in the process of draining.") } - t.appendToMemoryQueue <- s + t.memoryMutex.Lock() + t.memoryArena.AppendSamples(samples) + t.memoryMutex.Unlock() return } @@ -175,8 +180,6 @@ func (t *TieredStorage) rebuildDiskFrontier(i leveldb.Iterator) (err error) { func (t *TieredStorage) Serve() { flushMemoryTicker := time.NewTicker(t.flushMemoryInterval) defer flushMemoryTicker.Stop() - writeMemoryTicker := time.NewTicker(t.writeMemoryInterval) - defer writeMemoryTicker.Stop() reportTicker := time.NewTicker(time.Second) defer reportTicker.Stop() @@ -188,14 +191,12 @@ func (t *TieredStorage) Serve() { for { select { - case <-writeMemoryTicker.C: - t.writeMemory() case <-flushMemoryTicker.C: t.flushMemory() case viewRequest := <-t.viewQueue: t.renderView(viewRequest) case drainingDone := <-t.draining: - t.flush() + t.Flush() drainingDone <- true return } @@ -206,33 +207,12 @@ func (t *TieredStorage) reportQueues() { queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue))) queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue))) - queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "occupancy"}, float64(len(t.appendToMemoryQueue))) - queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "capacity"}, float64(cap(t.appendToMemoryQueue))) - queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue))) queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue))) } -func (t *TieredStorage) writeMemory() { - begin := time.Now() - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure}) - }() - - t.mutex.Lock() - defer t.mutex.Unlock() - - pendingLength := len(t.appendToMemoryQueue) - - for i := 0; i < pendingLength; i++ { - t.memoryArena.AppendSamples(<-t.appendToMemoryQueue) - } -} - func (t *TieredStorage) Flush() { - t.flush() + t.flushMemory() } func (t *TieredStorage) Close() { @@ -242,31 +222,23 @@ func (t *TieredStorage) Close() { t.memoryArena.Close() close(t.appendToDiskQueue) - close(t.appendToMemoryQueue) close(t.viewQueue) log.Println("Done.") } -// Write all pending appends. -func (t *TieredStorage) flush() (err error) { - // Trim any old values to reduce iterative write costs. - t.flushMemory() - t.writeMemory() - t.flushMemory() - return -} - type memoryToDiskFlusher struct { - toDiskQueue chan model.Samples - disk MetricPersistence - olderThan time.Time - valuesAccepted int - valuesRejected int + toDiskQueue chan model.Samples + disk MetricPersistence + olderThan time.Time + valuesAccepted int + valuesRejected int + memoryDeleteMutex *sync.RWMutex } type memoryToDiskFlusherVisitor struct { - stream stream - flusher *memoryToDiskFlusher + stream stream + flusher *memoryToDiskFlusher + memoryDeleteMutex *sync.RWMutex } func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) { @@ -311,15 +283,18 @@ func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storag }, } + f.memoryDeleteMutex.Lock() f.stream.values.Delete(skipListTime(recordTime)) + f.memoryDeleteMutex.Unlock() return } func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) { visitor := memoryToDiskFlusherVisitor{ - stream: stream, - flusher: f, + stream: stream, + flusher: f, + memoryDeleteMutex: f.memoryDeleteMutex, } return visitor, visitor, visitor @@ -338,7 +313,7 @@ func (f memoryToDiskFlusher) Close() { f.Flush() } -// Persist a whole bunch of samples to the datastore. +// Persist a whole bunch of samples from memory to the datastore. func (t *TieredStorage) flushMemory() { begin := time.Now() defer func() { @@ -347,13 +322,14 @@ func (t *TieredStorage) flushMemory() { recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure}) }() - t.mutex.Lock() - defer t.mutex.Unlock() + t.memoryMutex.RLock() + defer t.memoryMutex.RUnlock() flusher := &memoryToDiskFlusher{ - disk: t.DiskStorage, - olderThan: time.Now().Add(-1 * t.memoryTTL), - toDiskQueue: t.appendToDiskQueue, + disk: t.DiskStorage, + olderThan: time.Now().Add(-1 * t.memoryTTL), + toDiskQueue: t.appendToDiskQueue, + memoryDeleteMutex: &t.memoryDeleteMutex, } defer flusher.Close() @@ -372,15 +348,14 @@ func (t *TieredStorage) renderView(viewJob viewJob) { recordOutcome(duration, err, map[string]string{operation: renderView, result: success}, map[string]string{operation: renderView, result: failure}) }() - t.mutex.Lock() - defer t.mutex.Unlock() + // No samples may be deleted from memory while rendering a view. + t.memoryDeleteMutex.RLock() + defer t.memoryDeleteMutex.RUnlock() - var ( - scans = viewJob.builder.ScanJobs() - view = newView() - // Get a single iterator that will be used for all data extraction below. - iterator = t.DiskStorage.MetricSamples.NewIterator(true) - ) + scans := viewJob.builder.ScanJobs() + view := newView() + // Get a single iterator that will be used for all data extraction below. + iterator := t.DiskStorage.MetricSamples.NewIterator(true) defer iterator.Close() // Rebuilding of the frontier should happen on a conditional basis if a @@ -410,7 +385,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) { targetTime := *standingOps[0].CurrentTime() currentChunk := chunk{} + t.memoryMutex.RLock() memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) + t.memoryMutex.RUnlock() // If we aimed before the oldest value in memory, load more data from disk. if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { // XXX: For earnest performance gains analagous to the benchmarking we diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index b8cf2ee2b5..614549c20f 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -347,8 +347,6 @@ func testMakeView(t test.Tester, flushToDisk bool) { if flushToDisk { tiered.Flush() - } else { - tiered.writeMemory() } requestBuilder := NewViewRequestBuilder()