diff --git a/main.go b/main.go index 199846fd7..e8f8ab7a7 100644 --- a/main.go +++ b/main.go @@ -218,11 +218,9 @@ func (p *prometheus) Serve() { }() for samples := range p.unwrittenSamples { - if len(samples) > 0 { - p.storage.AppendSamples(samples) - if p.remoteTSDBQueue != nil { - p.remoteTSDBQueue.Queue(samples) - } + p.storage.AppendSamples(samples) + if p.remoteTSDBQueue != nil { + p.remoteTSDBQueue.Queue(samples) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index d170b1f23..c3206fa53 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -16,7 +16,6 @@ package local import ( "fmt" "io" - "math" "os" "path" "strings" @@ -342,7 +341,7 @@ func (p *persistence) cleanUpArchiveIndexes( if err := kv.Value(&m); err != nil { return err } - series := newMemorySeries(clientmodel.Metric(m), false, math.MinInt64) + series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest) cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now()) if err != nil { return err diff --git a/storage/local/interface.go b/storage/local/interface.go index 6d5c1d8f5..bdfa5698d 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -24,12 +24,15 @@ import ( ) // Storage ingests and manages samples, along with various indexes. All methods -// are goroutine-safe. +// except AppendSamples are goroutine-safe. type Storage interface { prometheus.Collector - // AppendSamples stores a group of new samples. Multiple samples for the same - // fingerprint need to be submitted in chronological order, from oldest to - // newest (both in the same call to AppendSamples and across multiple calls). + // AppendSamples stores a group of new samples. Multiple samples for the + // same fingerprint need to be submitted in chronological order, from + // oldest to newest (both in the same call to AppendSamples and across + // multiple calls). When AppendSamples has returned, the appended + // samples might not be queryable immediately. (Use WaitForIndexing to + // wait for complete processing.) This method is not goroutine-safe. AppendSamples(clientmodel.Samples) // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. diff --git a/storage/local/persistence.go b/storage/local/persistence.go index bdbecb902..1dc1a939f 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -84,7 +84,7 @@ type indexingOp struct { // A Persistence is used by a Storage implementation to store samples // persistently across restarts. The methods are only goroutine-safe if -// explicitly marked as such below. The chunk-related methods PersistChunk, +// explicitly marked as such below. The chunk-related methods persistChunk, // dropChunks, loadChunks, and loadChunkDescs can be called concurrently with // each other if each call refers to a different fingerprint. type persistence struct { diff --git a/storage/local/series.go b/storage/local/series.go index 8f47dc639..0c1065a9d 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -14,7 +14,6 @@ package local import ( - "math" "sort" "sync" "sync/atomic" @@ -169,7 +168,7 @@ type memorySeries struct { // will be set properly upon the first eviction of chunkDescs). func newMemorySeries(m clientmodel.Metric, reallyNew bool, firstTime clientmodel.Timestamp) *memorySeries { if reallyNew { - firstTime = math.MinInt64 + firstTime = clientmodel.Earliest } s := memorySeries{ metric: m, @@ -337,7 +336,7 @@ func (s *memorySeries) preloadChunksForRange( from clientmodel.Timestamp, through clientmodel.Timestamp, fp clientmodel.Fingerprint, mss *memorySeriesStorage, ) ([]*chunkDesc, error) { - firstChunkDescTime := clientmodel.Timestamp(math.MaxInt64) + firstChunkDescTime := clientmodel.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() } diff --git a/storage/local/storage.go b/storage/local/storage.go index 843929d2c..4c41f2ba3 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,7 +16,7 @@ package local import ( "container/list" - "math" + "sync" "sync/atomic" "time" @@ -34,11 +34,14 @@ const ( // See waitForNextFP. fpMaxWaitDuration = 10 * time.Second - fpMinWaitDuration = 5 * time.Millisecond // ~ hard disk seek time. + fpMinWaitDuration = 20 * time.Millisecond // A small multiple of disk seek time. fpMaxSweepTime = 6 * time.Hour maxEvictInterval = time.Minute headChunkTimeout = time.Hour // Close head chunk if not touched for that long. + + appendWorkers = 8 // Should be enough to not make appending a bottleneck. + appendQueueCap = 2 * appendWorkers ) type storageState uint @@ -69,6 +72,10 @@ type memorySeriesStorage struct { checkpointInterval time.Duration checkpointDirtySeriesLimit int + appendQueue chan *clientmodel.Sample + appendLastTimestamp clientmodel.Timestamp // The timestamp of the last sample sent to the append queue. + appendWaitGroup sync.WaitGroup // To wait for all appended samples to be processed. + persistQueue chan persistRequest persistStopped chan struct{} persistence *persistence @@ -124,8 +131,8 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { }) numSeries.Set(float64(fpToSeries.length())) - return &memorySeriesStorage{ - fpLocker: newFingerprintLocker(256), + s := &memorySeriesStorage{ + fpLocker: newFingerprintLocker(1024), fpToSeries: fpToSeries, loopStopping: make(chan struct{}), @@ -135,6 +142,9 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { checkpointInterval: o.CheckpointInterval, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, + appendLastTimestamp: clientmodel.Earliest, + appendQueue: make(chan *clientmodel.Sample, appendQueueCap), + persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity), persistStopped: make(chan struct{}), persistence: p, @@ -194,7 +204,18 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { Name: "invalid_preload_requests_total", Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", }), - }, nil + } + + for i := 0; i < appendWorkers; i++ { + go func() { + for sample := range s.appendQueue { + s.appendSample(sample) + s.appendWaitGroup.Done() + } + }() + } + + return s, nil } // Start implements Storage. @@ -208,6 +229,11 @@ func (s *memorySeriesStorage) Start() { func (s *memorySeriesStorage) Stop() error { glog.Info("Stopping local storage...") + glog.Info("Draining append queue...") + close(s.appendQueue) + s.appendWaitGroup.Wait() + glog.Info("Append queue drained.") + glog.Info("Stopping maintenance loop...") close(s.loopStopping) <-s.loopStopped @@ -234,6 +260,9 @@ func (s *memorySeriesStorage) Stop() error { // WaitForIndexing implements Storage. func (s *memorySeriesStorage) WaitForIndexing() { + // First let all goroutines appending samples stop. + s.appendWaitGroup.Wait() + // Only then wait for the persistence to index them. s.persistence.waitForIndexing() } @@ -361,10 +390,19 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint // AppendSamples implements Storage. func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { for _, sample := range samples { - s.appendSample(sample) + if sample.Timestamp != s.appendLastTimestamp { + // Timestamp has changed. We have to wait for processing + // of all appended samples before proceeding. Otherwise, + // we might violate the storage contract that each + // sample appended to a given series has to have a + // timestamp greater or equal to the previous sample + // appended to that series. + s.appendWaitGroup.Wait() + s.appendLastTimestamp = sample.Timestamp + } + s.appendWaitGroup.Add(1) + s.appendQueue <- sample } - - s.ingestedSamplesCount.Add(float64(len(samples))) } func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { @@ -376,6 +414,8 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { Timestamp: sample.Timestamp, }) s.fpLocker.Unlock(fp) + s.ingestedSamplesCount.Inc() + if len(chunkDescsToPersist) == 0 { return } @@ -700,7 +740,16 @@ loop: s.seriesOps.WithLabelValues(archiveMaintenance).Inc() case <-s.countPersistedHeadChunks: headChunksPersistedSinceLastCheckpoint++ - if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit { + // Check if we have enough "dirty" series so that we need an early checkpoint. + // As described above, we take the headChunksPersistedSinceLastCheckpoint as a + // heuristic for "dirty" series. However, if we are already backlogging + // chunks to be persisted, creating a checkpoint would be counterproductive, + // as it would slow down chunk persisting even more, while in a situation like + // that, the best we can do for crash recovery is to work through the persist + // queue as quickly as possible. So only checkpoint if s.persistQueue is + // at most 20% full. + if headChunksPersistedSinceLastCheckpoint >= s.checkpointDirtySeriesLimit && + len(s.persistQueue) < cap(s.persistQueue)/5 { checkpointTimer.Reset(0) } } @@ -751,7 +800,7 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { // Make sure we have a head chunk descriptor (a freshly // unarchived series has none). if len(series.chunkDescs) == 0 { - cds, err := s.loadChunkDescs(fp, math.MaxInt64) + cds, err := s.loadChunkDescs(fp, clientmodel.Latest) if err != nil { glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err) return diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 461cd8dcd..9b5a37e7e 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -70,6 +70,7 @@ func TestChunk(t *testing.T) { defer closer.Close() s.AppendSamples(samples) + s.WaitForIndexing() for m := range s.(*memorySeriesStorage).fpToSeries.iter() { s.(*memorySeriesStorage).fpLocker.Lock(m.fp) @@ -109,6 +110,7 @@ func TestGetValueAtTime(t *testing.T) { defer closer.Close() s.AppendSamples(samples) + s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() @@ -191,6 +193,7 @@ func TestGetRangeValues(t *testing.T) { defer closer.Close() s.AppendSamples(samples) + s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() @@ -334,6 +337,7 @@ func TestEvictAndPurgeSeries(t *testing.T) { ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method. s.AppendSamples(samples) + s.WaitForIndexing() fp := clientmodel.Metric{}.Fingerprint() @@ -368,6 +372,7 @@ func TestEvictAndPurgeSeries(t *testing.T) { // Recreate series. s.AppendSamples(samples) + s.WaitForIndexing() series, ok := ms.fpToSeries.get(fp) if !ok { @@ -450,6 +455,7 @@ func TestFuzz(t *testing.T) { samples := createRandomSamples() s.AppendSamples(samples) + s.WaitForIndexing() return verifyStorage(t, s, samples, 24*7*time.Hour) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 526400911..71bae548e 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -115,6 +115,9 @@ func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { // Queue queues a sample batch to be sent to the TSDB. It drops the most // recently queued samples on the floor if the queue is full. func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { + if len(s) == 0 { + return + } select { case t.queue <- s: default: