diff --git a/storage/metric/tiered/memory.go b/storage/metric/tiered/memory.go index 9b636bb88..9341679ef 100644 --- a/storage/metric/tiered/memory.go +++ b/storage/metric/tiered/memory.go @@ -33,7 +33,8 @@ type stream interface { add(metric.Values) clone() metric.Values - expunge(age clientmodel.Timestamp) metric.Values + getOlderThan(age clientmodel.Timestamp) metric.Values + evictOlderThan(age clientmodel.Timestamp) size() int clear() @@ -89,7 +90,19 @@ func (s *arrayStream) clone() metric.Values { return clone } -func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { +func (s *arrayStream) getOlderThan(t clientmodel.Timestamp) metric.Values { + s.RLock() + defer s.RUnlock() + + finder := func(i int) bool { + return s.values[i].Timestamp.After(t) + } + + i := sort.Search(len(s.values), finder) + return s.values[:i] +} + +func (s *arrayStream) evictOlderThan(t clientmodel.Timestamp) { s.Lock() defer s.Unlock() @@ -98,10 +111,7 @@ func (s *arrayStream) expunge(t clientmodel.Timestamp) metric.Values { } i := sort.Search(len(s.values), finder) - expunged := s.values[:i] s.values = s.values[i:] - - return expunged } func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) metric.Values { @@ -282,11 +292,9 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric, fp *client } func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) { - emptySeries := []clientmodel.Fingerprint{} - s.RLock() - for fingerprint, stream := range s.fingerprintToSeries { - toArchive := stream.expunge(flushOlderThan) + for _, stream := range s.fingerprintToSeries { + toArchive := stream.getOlderThan(flushOlderThan) queued := make(clientmodel.Samples, 0, len(toArchive)) // NOTE: This duplication will go away soon. for _, value := range toArchive { @@ -303,20 +311,29 @@ func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue if len(queued) > 0 { queue <- queued } + } + s.RUnlock() +} +func (s *memorySeriesStorage) Evict(flushOlderThan clientmodel.Timestamp) { + emptySeries := []clientmodel.Fingerprint{} + + s.RLock() + for fingerprint, stream := range s.fingerprintToSeries { + stream.evictOlderThan(flushOlderThan) if stream.size() == 0 { emptySeries = append(emptySeries, fingerprint) } } s.RUnlock() + s.Lock() for _, fingerprint := range emptySeries { if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 { - s.Lock() s.dropSeries(&fingerprint) - s.Unlock() } } + s.Unlock() } // Drop a label value from the label names to label values index. diff --git a/storage/metric/tiered/memory_test.go b/storage/metric/tiered/memory_test.go index b3b52c8fe..a6a53bb02 100644 --- a/storage/metric/tiered/memory_test.go +++ b/storage/metric/tiered/memory_test.go @@ -171,7 +171,8 @@ func TestDroppedSeriesIndexRegression(t *testing.T) { } toDisk := make(chan clientmodel.Samples, 2) - s.Flush(clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)), toDisk) + flushOlderThan := clientmodel.TimestampFromTime(time.Date(2001, 0, 0, 0, 0, 0, 0, time.UTC)) + s.Flush(flushOlderThan, toDisk) if len(toDisk) != 1 { t.Fatalf("Got %d disk sample lists, expected 1", len(toDisk)) } @@ -179,6 +180,7 @@ func TestDroppedSeriesIndexRegression(t *testing.T) { if len(diskSamples) != 1 { t.Fatalf("Got %d disk samples, expected 1", len(diskSamples)) } + s.Evict(flushOlderThan) fps, err = s.GetFingerprintsForLabelMatchers(labelMatchersFromLabelSet(common)) if err != nil { diff --git a/storage/metric/tiered/tiered.go b/storage/metric/tiered/tiered.go index e0bd6031c..cc02e8480 100644 --- a/storage/metric/tiered/tiered.go +++ b/storage/metric/tiered/tiered.go @@ -364,6 +364,7 @@ func (t *TieredStorage) flushMemory(ttl time.Duration) { glog.Infof("Writing %d samples...", len(samples)) t.DiskStorage.AppendSamples(samples) } + t.memoryArena.Evict(flushOlderThan) glog.Info("Done flushing.") } diff --git a/web/api/api.go b/web/api/api.go index 3d749380b..4c42ba6ec 100644 --- a/web/api/api.go +++ b/web/api/api.go @@ -15,6 +15,7 @@ package api import ( "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config"