Drop empty series from memory after flushing.

This commit is contained in:
Julius Volz 2013-06-19 11:55:34 +02:00
parent 235623b45d
commit 16364eda37
2 changed files with 66 additions and 30 deletions

View file

@ -218,6 +218,70 @@ func (s *memorySeriesStorage) getOrCreateSeries(metric model.Metric, fingerprint
return series return series
} }
func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- model.Samples) {
emptySeries := []model.Fingerprint{}
s.RLock()
for fingerprint, stream := range s.fingerprintToSeries {
finder := func(i int) bool {
return stream.values[i].Timestamp.After(flushOlderThan)
}
stream.Lock()
i := sort.Search(len(stream.values), finder)
toArchive := stream.values[:i]
toKeep := stream.values[i:]
queued := make(model.Samples, 0, len(toArchive))
for _, value := range toArchive {
queued = append(queued, model.Sample{
Metric: stream.metric,
Timestamp: value.Timestamp,
Value: value.Value,
})
}
// BUG(all): this can deadlock if the queue is full, as we only ever clear
// the queue after calling this method:
// https://github.com/prometheus/prometheus/issues/275
queue <- queued
stream.values = toKeep
if len(toKeep) == 0 {
emptySeries = append(emptySeries, fingerprint)
}
stream.Unlock()
}
s.RUnlock()
s.Lock()
for _, fingerprint := range emptySeries {
if len(s.fingerprintToSeries[fingerprint].values) == 0 {
s.dropSeries(&fingerprint)
}
}
s.Unlock()
}
// Drop all references to a series, including any samples.
func (s *memorySeriesStorage) dropSeries(fingerprint *model.Fingerprint) {
series, ok := s.fingerprintToSeries[*fingerprint]
if !ok {
return
}
for k, v := range series.metric {
labelPair := model.LabelPair{
Name: k,
Value: v,
}
delete(s.labelPairToFingerprints, labelPair)
delete(s.labelNameToFingerprints, k)
}
delete(s.fingerprintToSeries, *fingerprint)
}
// Append raw samples, bypassing indexing. Only used to add data to views, // Append raw samples, bypassing indexing. Only used to add data to views,
// which don't need to lookup by metric. // which don't need to lookup by metric.
func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *model.Fingerprint, samples model.Values) { func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *model.Fingerprint, samples model.Values) {

View file

@ -262,38 +262,10 @@ func (t *TieredStorage) Flush() {
} }
func (t *TieredStorage) flushMemory(ttl time.Duration) { func (t *TieredStorage) flushMemory(ttl time.Duration) {
t.memoryArena.RLock() flushOlderThan := time.Now().Add(-1 * ttl)
defer t.memoryArena.RUnlock()
cutOff := time.Now().Add(-1 * ttl)
log.Println("Flushing...") log.Println("Flushing...")
t.memoryArena.Flush(flushOlderThan, t.appendToDiskQueue)
for _, stream := range t.memoryArena.fingerprintToSeries {
finder := func(i int) bool {
return stream.values[i].Timestamp.After(cutOff)
}
stream.Lock()
i := sort.Search(len(stream.values), finder)
toArchive := stream.values[:i]
toKeep := stream.values[i:]
queued := make(model.Samples, 0, len(toArchive))
for _, value := range toArchive {
queued = append(queued, model.Sample{
Metric: stream.metric,
Timestamp: value.Timestamp,
Value: value.Value,
})
}
t.appendToDiskQueue <- queued
stream.values = toKeep
stream.Unlock()
}
queueLength := len(t.appendToDiskQueue) queueLength := len(t.appendToDiskQueue)
if queueLength > 0 { if queueLength > 0 {