diff --git a/storage/local/persistence.go b/storage/local/persistence.go index bdbecb902f..1dc1a939f4 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -84,7 +84,7 @@ type indexingOp struct { // A Persistence is used by a Storage implementation to store samples // persistently across restarts. The methods are only goroutine-safe if -// explicitly marked as such below. The chunk-related methods PersistChunk, +// explicitly marked as such below. The chunk-related methods persistChunk, // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { diff --git a/storage/local/storage.go b/storage/local/storage.go index 93da63e403..4c41f2ba35 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -39,6 +39,9 @@ const ( maxEvictInterval = time.Minute headChunkTimeout = time.Hour // Close head chunk if not touched for that long. + + appendWorkers = 8 // Should be enough to not make appending a bottleneck. + appendQueueCap = 2 * appendWorkers ) type storageState uint @@ -69,10 +72,9 @@ type memorySeriesStorage struct { checkpointInterval time.Duration checkpointDirtySeriesLimit int - // The timestamp of the last sample appended. - lastTimestampAppended clientmodel.Timestamp - // Wait group for goroutines appending samples with the same timestamp. - appendWaitGroup sync.WaitGroup + appendQueue chan *clientmodel.Sample + appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. + appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed. persistQueue chan persistRequest persistStopped chan struct{} @@ -129,7 +131,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { }) numSeries.Set(float64(fpToSeries.length())) - return &memorySeriesStorage{ + s := &memorySeriesStorage{ fpLocker: newFingerprintLocker(1024), fpToSeries: fpToSeries, @@ -140,7 +142,8 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, - lastTimestampAppended: clientmodel.Earliest, + appendLastTimestamp: clientmodel.Earliest, + appendQueue: make(chan *clientmodel.Sample, appendQueueCap), persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity), persistStopped: make(chan struct{}), @@ -201,7 +204,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { Name: "invalid_preload_requests_total", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", }), - }, nil + } + + for i := 0; i < appendWorkers; i++ { + go func() { + for sample := range s.appendQueue { + s.appendSample(sample) + s.appendWaitGroup.Done() + } + }() + } + + return s, nil } // Start implements Storage. @@ -215,6 +229,11 @@ func (s *memorySeriesStorage) Start() { func (s *memorySeriesStorage) Stop() error { glog.Info("Stopping local storage...") + glog.Info("Draining append queue...") + close(s.appendQueue) + s.appendWaitGroup.Wait() + glog.Info("Append queue drained.") + glog.Info("Stopping maintenance loop...") close(s.loopStopping) <-s.loopStopped @@ -371,18 +390,18 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint // AppendSamples implements Storage. func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { for _, sample := range samples { - if sample.Timestamp != s.lastTimestampAppended { - // Timestamp has changed. We have to wait for all - // appendSample to complete before proceeding. + if sample.Timestamp != s.appendLastTimestamp { + // Timestamp has changed. We have to wait for processing + // of all appended samples before proceeding. Otherwise, + // we might violate the storage contract that each + // sample appended to a given series has to have a + // timestamp greater or equal to the previous sample + // appended to that series. s.appendWaitGroup.Wait() - s.lastTimestampAppended = sample.Timestamp + s.appendLastTimestamp = sample.Timestamp } - s.appendWaitGroup.Add(1) - go func(sample *clientmodel.Sample) { - s.appendSample(sample) - s.appendWaitGroup.Done() - }(sample) + s.appendQueue <- sample } }