diff --git a/storage/metric/memory.go b/storage/metric/memory.go index 1197e58fc1..6187b79230 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -218,6 +218,70 @@ func (s *memorySeriesStorage) getOrCreateSeries(metric model.Metric, fingerprint return series } +func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- model.Samples) { + emptySeries := []model.Fingerprint{} + + s.RLock() + for fingerprint, stream := range s.fingerprintToSeries { + finder := func(i int) bool { + return stream.values[i].Timestamp.After(flushOlderThan) + } + + stream.Lock() + + i := sort.Search(len(stream.values), finder) + toArchive := stream.values[:i] + toKeep := stream.values[i:] + queued := make(model.Samples, 0, len(toArchive)) + + for _, value := range toArchive { + queued = append(queued, model.Sample{ + Metric: stream.metric, + Timestamp: value.Timestamp, + Value: value.Value, + }) + } + + // BUG(all): this can deadlock if the queue is full, as we only ever clear + // the queue after calling this method: + // https://github.com/prometheus/prometheus/issues/275 + queue <- queued + + stream.values = toKeep + + if len(toKeep) == 0 { + emptySeries = append(emptySeries, fingerprint) + } + stream.Unlock() + } + s.RUnlock() + + s.Lock() + for _, fingerprint := range emptySeries { + if len(s.fingerprintToSeries[fingerprint].values) == 0 { + s.dropSeries(&fingerprint) + } + } + s.Unlock() +} + +// Drop all references to a series, including any samples. +func (s *memorySeriesStorage) dropSeries(fingerprint *model.Fingerprint) { + series, ok := s.fingerprintToSeries[*fingerprint] + if !ok { + return + } + for k, v := range series.metric { + labelPair := model.LabelPair{ + Name: k, + Value: v, + } + delete(s.labelPairToFingerprints, labelPair) + delete(s.labelNameToFingerprints, k) + } + delete(s.fingerprintToSeries, *fingerprint) +} + // Append raw samples, bypassing indexing. Only used to add data to views, // which don't need to lookup by metric. func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *model.Fingerprint, samples model.Values) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index c550c75e5c..b352612cb5 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -262,38 +262,10 @@ func (t *TieredStorage) Flush() { } func (t *TieredStorage) flushMemory(ttl time.Duration) { - t.memoryArena.RLock() - defer t.memoryArena.RUnlock() - - cutOff := time.Now().Add(-1 * ttl) + flushOlderThan := time.Now().Add(-1 * ttl) log.Println("Flushing...") - - for _, stream := range t.memoryArena.fingerprintToSeries { - finder := func(i int) bool { - return stream.values[i].Timestamp.After(cutOff) - } - - stream.Lock() - - i := sort.Search(len(stream.values), finder) - toArchive := stream.values[:i] - toKeep := stream.values[i:] - queued := make(model.Samples, 0, len(toArchive)) - - for _, value := range toArchive { - queued = append(queued, model.Sample{ - Metric: stream.metric, - Timestamp: value.Timestamp, - Value: value.Value, - }) - } - - t.appendToDiskQueue <- queued - - stream.values = toKeep - stream.Unlock() - } + t.memoryArena.Flush(flushOlderThan, t.appendToDiskQueue) queueLength := len(t.appendToDiskQueue) if queueLength > 0 {