Only evict memory series after they are on disk.

This fixes the problem where samples become temporarily unavailable for
queries while they are being flushed to disk. Although the entire
flushing code could use some major refactoring, I'm explicitly trying to
do the minimal change to fix the problem since there's a whole new
storage implementation in the pipeline.

Change-Id: I0f5393a30b88654c73567456aeaea62f8b3756d9
This commit is contained in:
Julius Volz 2014-06-30 13:19:07 +02:00 committed by Bjoern Rabenstein
parent 8956faeccb
commit 21cafe6cd7
4 changed files with 33 additions and 12 deletions

View file

@ -33,7 +33,8 @@ type stream interface {
add(metric.Values)
clone() metric.Values
expunge(age clientmodel.Timestamp) metric.Values
getOlderThan(age clientmodel.Timestamp) metric.Values
evictOlderThan(age clientmodel.Timestamp)
size() int
clear()
@ -89,7 +90,19 @@ func (s *arrayStream) clone() metric.Values {
return clone
}
func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values {
func (s *arrayStream) getOlderThan(t clientmodel.Timestamp) metric.Values {
s.RLock()
defer s.RUnlock()
finder := func(i int) bool {
return s.values[i].Timestamp.After(t)
}
i := sort.Search(len(s.values), finder)
return s.values[:i]
}
func (s *arrayStream) evictOlderThan(t clientmodel.Timestamp) {
s.Lock()
defer s.Unlock()
@ -98,10 +111,7 @@ func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values {
}
i := sort.Search(len(s.values), finder)
expunged := s.values[:i]
s.values = s.values[i:]
return expunged
}
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values {
@ -282,11 +292,9 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *client
}
func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) {
emptySeries := []clientmodel.Fingerprint{}
s.RLock()
for fingerprint, stream := range s.fingerprintToSeries {
toArchive := stream.expunge(flushOlderThan)
for _, stream := range s.fingerprintToSeries {
toArchive := stream.getOlderThan(flushOlderThan)
queued := make(clientmodel.Samples, 0, len(toArchive))
// NOTE: This duplication will go away soon.
for _, value := range toArchive {
@ -303,20 +311,29 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue
if len(queued) > 0 {
queue <- queued
}
}
s.RUnlock()
}
func (s *memorySeriesStorage) Evict(flushOlderThan clientmodel.Timestamp) {
emptySeries := []clientmodel.Fingerprint{}
s.RLock()
for fingerprint, stream := range s.fingerprintToSeries {
stream.evictOlderThan(flushOlderThan)
if stream.size() == 0 {
emptySeries = append(emptySeries, fingerprint)
}
}
s.RUnlock()
s.Lock()
for _, fingerprint := range emptySeries {
if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 {
s.Lock()
s.dropSeries(&fingerprint)
}
}
s.Unlock()
}
}
}
// Drop a label value from the label names to label values index.

View file

@ -171,7 +171,8 @@ func TestDroppedSeriesIndexRegression(t *testing.T) {
}
toDisk := make(chan clientmodel.Samples, 2)
s.Flush(clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)), toDisk)
flushOlderThan := clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC))
s.Flush(flushOlderThan, toDisk)
if len(toDisk) != 1 {
t.Fatalf("Got %d disk sample lists, expected 1", len(toDisk))
}
@ -179,6 +180,7 @@ func TestDroppedSeriesIndexRegression(t *testing.T) {
if len(diskSamples) != 1 {
t.Fatalf("Got %d disk samples, expected 1", len(diskSamples))
}
s.Evict(flushOlderThan)
fps, err = s.GetFingerprintsForLabelMatchers(labelMatchersFromLabelSet(common))
if err != nil {

View file

@ -364,6 +364,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) {
glog.Infof("Writing %d samples...", len(samples))
t.DiskStorage.AppendSamples(samples)
}
t.memoryArena.Evict(flushOlderThan)
glog.Info("Done flushing.")
}

View file

@ -15,6 +15,7 @@ package api
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"