From 21cafe6cd7a349ffd437d0b0baa2dcd15bf49c99 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 30 Jun 2014 13:19:07 +0200 Subject: [PATCH] Only evict memory series after they are on disk. This fixes the problem where samples become temporarily unavailable for queries while they are being flushed to disk. Although the entire flushing code could use some major refactoring, I'm explicitly trying to do the minimal change to fix the problem since there's a whole new storage implementation in the pipeline. Change-Id: I0f5393a30b88654c73567456aeaea62f8b3756d9 --- storage/metric/tiered/memory.go | 39 ++++++++++++++++++++-------- storage/metric/tiered/memory_test.go | 4 ++- storage/metric/tiered/tiered.go | 1 + web/api/api.go | 1 + 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/storage/metric/tiered/memory.go b/storage/metric/tiered/memory.go index 9b636bb889..9341679ef9 100644 --- a/storage/metric/tiered/memory.go +++ b/storage/metric/tiered/memory.go @@ -33,7 +33,8 @@ type stream interface { add(metric.Values) clone() metric.Values - expunge(age clientmodel.Timestamp) metric.Values + getOlderThan(age clientmodel.Timestamp) metric.Values + evictOlderThan(age clientmodel.Timestamp) size() int clear() @@ -89,7 +90,19 @@ func (s *arrayStream) clone() metric.Values { return clone } -func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { +func (s *arrayStream) getOlderThan(t clientmodel.Timestamp) metric.Values { + s.RLock() + defer s.RUnlock() + + finder := func(i int) bool { + return s.values[i].Timestamp.After(t) + } + + i := sort.Search(len(s.values), finder) + return s.values[:i] +} + +func (s *arrayStream) evictOlderThan(t clientmodel.Timestamp) { s.Lock() defer s.Unlock() @@ -98,10 +111,7 @@ func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { } i := sort.Search(len(s.values), finder) - expunged := s.values[:i] s.values = s.values[i:] - - return expunged } func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values { @@ -282,11 +292,9 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *client } func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) { - emptySeries := []clientmodel.Fingerprint{} - s.RLock() - for fingerprint, stream := range s.fingerprintToSeries { - toArchive := stream.expunge(flushOlderThan) + for _, stream := range s.fingerprintToSeries { + toArchive := stream.getOlderThan(flushOlderThan) queued := make(clientmodel.Samples, 0, len(toArchive)) // NOTE: This duplication will go away soon. for _, value := range toArchive { @@ -303,20 +311,29 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue if len(queued) > 0 { queue <- queued } + } + s.RUnlock() +} +func (s *memorySeriesStorage) Evict(flushOlderThan clientmodel.Timestamp) { + emptySeries := []clientmodel.Fingerprint{} + + s.RLock() + for fingerprint, stream := range s.fingerprintToSeries { + stream.evictOlderThan(flushOlderThan) if stream.size() == 0 { emptySeries = append(emptySeries, fingerprint) } } s.RUnlock() + s.Lock() for _, fingerprint := range emptySeries { if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 { - s.Lock() s.dropSeries(&fingerprint) - s.Unlock() } } + s.Unlock() } // Drop a label value from the label names to label values index. diff --git a/storage/metric/tiered/memory_test.go b/storage/metric/tiered/memory_test.go index b3b52c8fe9..a6a53bb02c 100644 --- a/storage/metric/tiered/memory_test.go +++ b/storage/metric/tiered/memory_test.go @@ -171,7 +171,8 @@ func TestDroppedSeriesIndexRegression(t *testing.T) { } toDisk := make(chan clientmodel.Samples, 2) - s.Flush(clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)), toDisk) + flushOlderThan := clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)) + s.Flush(flushOlderThan, toDisk) if len(toDisk) != 1 { t.Fatalf("Got %d disk sample lists, expected 1", len(toDisk)) } @@ -179,6 +180,7 @@ func TestDroppedSeriesIndexRegression(t *testing.T) { if len(diskSamples) != 1 { t.Fatalf("Got %d disk samples, expected 1", len(diskSamples)) } + s.Evict(flushOlderThan) fps, err = s.GetFingerprintsForLabelMatchers(labelMatchersFromLabelSet(common)) if err != nil { diff --git a/storage/metric/tiered/tiered.go b/storage/metric/tiered/tiered.go index e0bd6031c3..cc02e84805 100644 --- a/storage/metric/tiered/tiered.go +++ b/storage/metric/tiered/tiered.go @@ -364,6 +364,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) { glog.Infof("Writing %d samples...", len(samples)) t.DiskStorage.AppendSamples(samples) } + t.memoryArena.Evict(flushOlderThan) glog.Info("Done flushing.") } diff --git a/web/api/api.go b/web/api/api.go index 3d749380b2..4c42ba6ec5 100644 --- a/web/api/api.go +++ b/web/api/api.go @@ -15,6 +15,7 @@ package api import ( "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config"