Only evict memory series after they are on disk.

This fixes the problem where samples become temporarily unavailable for
queries while they are being flushed to disk. Although the entire
flushing code could use some major refactoring, I'm explicitly trying to
do the minimal change to fix the problem since there's a whole new
storage implementation in the pipeline.

Change-Id: I0f5393a30b88654c73567456aeaea62f8b3756d9
This commit is contained in:
Julius Volz 2014-06-30 13:19:07 +02:00
parent 2128d9d811
commit 9410f15a73
4 changed files with 33 additions and 12 deletions

View file

@ -33,7 +33,8 @@ type stream interface {
add(metric.Values) add(metric.Values)
clone() metric.Values clone() metric.Values
expunge(age clientmodel.Timestamp) metric.Values getOlderThan(age clientmodel.Timestamp) metric.Values
evictOlderThan(age clientmodel.Timestamp)
size() int size() int
clear() clear()
@ -89,7 +90,19 @@ func (s *arrayStream) clone() metric.Values {
return clone return clone
} }
func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { func (s *arrayStream) getOlderThan(t clientmodel.Timestamp) metric.Values {
s.RLock()
defer s.RUnlock()
finder := func(i int) bool {
return s.values[i].Timestamp.After(t)
}
i := sort.Search(len(s.values), finder)
return s.values[:i]
}
func (s *arrayStream) evictOlderThan(t clientmodel.Timestamp) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -98,10 +111,7 @@ func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values {
} }
i := sort.Search(len(s.values), finder) i := sort.Search(len(s.values), finder)
expunged := s.values[:i]
s.values = s.values[i:] s.values = s.values[i:]
return expunged
} }
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values { func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values {
@ -282,11 +292,9 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *client
} }
func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) { func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) {
emptySeries := []clientmodel.Fingerprint{}
s.RLock() s.RLock()
for fingerprint, stream := range s.fingerprintToSeries { for _, stream := range s.fingerprintToSeries {
toArchive := stream.expunge(flushOlderThan) toArchive := stream.getOlderThan(flushOlderThan)
queued := make(clientmodel.Samples, 0, len(toArchive)) queued := make(clientmodel.Samples, 0, len(toArchive))
// NOTE: This duplication will go away soon. // NOTE: This duplication will go away soon.
for _, value := range toArchive { for _, value := range toArchive {
@ -303,20 +311,29 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue
if len(queued) > 0 { if len(queued) > 0 {
queue <- queued queue <- queued
} }
}
s.RUnlock()
}
func (s *memorySeriesStorage) Evict(flushOlderThan clientmodel.Timestamp) {
emptySeries := []clientmodel.Fingerprint{}
s.RLock()
for fingerprint, stream := range s.fingerprintToSeries {
stream.evictOlderThan(flushOlderThan)
if stream.size() == 0 { if stream.size() == 0 {
emptySeries = append(emptySeries, fingerprint) emptySeries = append(emptySeries, fingerprint)
} }
} }
s.RUnlock() s.RUnlock()
s.Lock()
for _, fingerprint := range emptySeries { for _, fingerprint := range emptySeries {
if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 { if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 {
s.Lock()
s.dropSeries(&fingerprint) s.dropSeries(&fingerprint)
}
}
s.Unlock() s.Unlock()
}
}
} }
// Drop a label value from the label names to label values index. // Drop a label value from the label names to label values index.

View file

@ -171,7 +171,8 @@ func TestDroppedSeriesIndexRegression(t *testing.T) {
} }
toDisk := make(chan clientmodel.Samples, 2) toDisk := make(chan clientmodel.Samples, 2)
s.Flush(clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)), toDisk) flushOlderThan := clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC))
s.Flush(flushOlderThan, toDisk)
if len(toDisk) != 1 { if len(toDisk) != 1 {
t.Fatalf("Got %d disk sample lists, expected 1", len(toDisk)) t.Fatalf("Got %d disk sample lists, expected 1", len(toDisk))
} }
@ -179,6 +180,7 @@ func TestDroppedSeriesIndexRegression(t *testing.T) {
if len(diskSamples) != 1 { if len(diskSamples) != 1 {
t.Fatalf("Got %d disk samples, expected 1", len(diskSamples)) t.Fatalf("Got %d disk samples, expected 1", len(diskSamples))
} }
s.Evict(flushOlderThan)
fps, err = s.GetFingerprintsForLabelMatchers(labelMatchersFromLabelSet(common)) fps, err = s.GetFingerprintsForLabelMatchers(labelMatchersFromLabelSet(common))
if err != nil { if err != nil {

View file

@ -364,6 +364,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) {
glog.Infof("Writing %d samples...", len(samples)) glog.Infof("Writing %d samples...", len(samples))
t.DiskStorage.AppendSamples(samples) t.DiskStorage.AppendSamples(samples)
} }
t.memoryArena.Evict(flushOlderThan)
glog.Info("Done flushing.") glog.Info("Done flushing.")
} }

View file

@ -15,6 +15,7 @@ package api
import ( import (
"net/http" "net/http"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/config"