diff --git a/storage/metric/memory.go b/storage/metric/memory.go index a43f53be5c..625cf67d1d 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -41,38 +41,66 @@ func (v singletonValue) get() clientmodel.SampleValue { return clientmodel.SampleValue(v) } -type stream struct { +type stream interface { + add(...*SamplePair) + + clone() Values + expunge(age time.Time) Values + + size() int + clear() + + metric() clientmodel.Metric + + getValueAtTime(t time.Time) Values + getBoundaryValues(in Interval) Values + getRangeValues(in Interval) Values +} + +type arrayStream struct { sync.RWMutex - metric clientmodel.Metric + m clientmodel.Metric values Values } -func (s *stream) add(timestamp time.Time, value clientmodel.SampleValue) { +func (s *arrayStream) metric() clientmodel.Metric { + return s.m +} + +func (s *arrayStream) add(v ...*SamplePair) { s.Lock() defer s.Unlock() - // BUG(all): https://github.com/prometheus/prometheus/pull/265/files#r4336435. - - s.values = append(s.values, &SamplePair{ - Timestamp: timestamp.Round(time.Second).UTC(), - Value: value, - }) + s.values = append(s.values, v...) } -func (s *stream) clone() Values { +func (s *arrayStream) clone() Values { s.RLock() defer s.RUnlock() - // BUG(all): Examine COW technique. - clone := make(Values, len(s.values)) copy(clone, s.values) return clone } -func (s *stream) getValueAtTime(t time.Time) Values { +func (s *arrayStream) expunge(t time.Time) Values { + s.Lock() + defer s.Unlock() + + finder := func(i int) bool { + return s.values[i].Timestamp.After(t) + } + + i := sort.Search(len(s.values), finder) + expunged := s.values[:i] + s.values = s.values[i:] + + return expunged +} + +func (s *arrayStream) getValueAtTime(t time.Time) Values { s.RLock() defer s.RUnlock() @@ -102,7 +130,7 @@ func (s *stream) getValueAtTime(t time.Time) Values { } } -func (s *stream) getBoundaryValues(in Interval) Values { +func (s *arrayStream) getBoundaryValues(in Interval) Values { s.RLock() defer s.RUnlock() @@ -125,7 +153,7 @@ func (s *stream) getBoundaryValues(in Interval) Values { } } -func (s *stream) getRangeValues(in Interval) Values { +func (s *arrayStream) getRangeValues(in Interval) Values { s.RLock() defer s.RUnlock() @@ -143,13 +171,17 @@ func (s *stream) getRangeValues(in Interval) Values { return result } -func (s *stream) empty() bool { - return len(s.values) == 0 +func (s *arrayStream) size() int { + return len(s.values) } -func newStream(metric clientmodel.Metric) *stream { - return &stream{ - metric: metric, +func (s *arrayStream) clear() { + s.values = Values{} +} + +func newArrayStream(metric clientmodel.Metric) *arrayStream { + return &arrayStream{ + m: metric, values: make(Values, 0, initialSeriesArenaSize), } } @@ -158,7 +190,7 @@ type memorySeriesStorage struct { sync.RWMutex wmCache *WatermarkCache - fingerprintToSeries map[clientmodel.Fingerprint]*stream + fingerprintToSeries map[clientmodel.Fingerprint]stream labelPairToFingerprints map[LabelPair]clientmodel.Fingerprints labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints } @@ -184,7 +216,10 @@ func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error { fingerprint := &clientmodel.Fingerprint{} fingerprint.LoadFromMetric(sample.Metric) series := s.getOrCreateSeries(sample.Metric, fingerprint) - series.add(sample.Timestamp, sample.Value) + series.add(&SamplePair{ + Value: sample.Value, + Timestamp: sample.Timestamp, + }) if s.wmCache != nil { s.wmCache.Set(fingerprint, &watermarks{High: sample.Timestamp}) @@ -202,11 +237,11 @@ func (s *memorySeriesStorage) CreateEmptySeries(metric clientmodel.Metric) { s.getOrCreateSeries(metric, fingerprint) } -func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, fingerprint *clientmodel.Fingerprint) *stream { +func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, fingerprint *clientmodel.Fingerprint) stream { series, ok := s.fingerprintToSeries[*fingerprint] if !ok { - series = newStream(metric) + series = newArrayStream(metric) s.fingerprintToSeries[*fingerprint] = series for k, v := range metric { @@ -231,20 +266,12 @@ func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clien s.RLock() for fingerprint, stream := range s.fingerprintToSeries { - finder := func(i int) bool { - return stream.values[i].Timestamp.After(flushOlderThan) - } - - stream.Lock() - - i := sort.Search(len(stream.values), finder) - toArchive := stream.values[:i] - toKeep := stream.values[i:] + toArchive := stream.expunge(flushOlderThan) queued := make(clientmodel.Samples, 0, len(toArchive)) - + // NOTE: This duplication will go away soon. for _, value := range toArchive { queued = append(queued, &clientmodel.Sample{ - Metric: stream.metric, + Metric: stream.metric(), Timestamp: value.Timestamp, Value: value.Value, }) @@ -255,18 +282,15 @@ func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clien // https://github.com/prometheus/prometheus/issues/275 queue <- queued - stream.values = toKeep - - if len(toKeep) == 0 { + if stream.size() == 0 { emptySeries = append(emptySeries, fingerprint) } - stream.Unlock() } s.RUnlock() s.Lock() for _, fingerprint := range emptySeries { - if s.fingerprintToSeries[fingerprint].empty() { + if s.fingerprintToSeries[fingerprint].size() == 0 { s.dropSeries(&fingerprint) } } @@ -279,7 +303,7 @@ func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) { if !ok { return } - for k, v := range series.metric { + for k, v := range series.metric() { labelPair := LabelPair{ Name: k, Value: v, @@ -299,13 +323,11 @@ func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmo series, ok := s.fingerprintToSeries[*fingerprint] if !ok { - series = newStream(clientmodel.Metric{}) + series = newArrayStream(clientmodel.Metric{}) s.fingerprintToSeries[*fingerprint] = series } - for _, sample := range samples { - series.add(sample.Timestamp, sample.Value) - } + series.add(samples...) } func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l clientmodel.LabelSet) (fingerprints clientmodel.Fingerprints, err error) { @@ -367,7 +389,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint } metric := clientmodel.Metric{} - for label, value := range series.metric { + for label, value := range series.metric() { metric[label] = value } @@ -436,7 +458,7 @@ func (s *memorySeriesStorage) Close() { s.Lock() defer s.Unlock() - s.fingerprintToSeries = map[clientmodel.Fingerprint]*stream{} + s.fingerprintToSeries = map[clientmodel.Fingerprint]stream{} s.labelPairToFingerprints = map[LabelPair]clientmodel.Fingerprints{} s.labelNameToFingerprints = map[clientmodel.LabelName]clientmodel.Fingerprints{} } @@ -447,7 +469,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa valueSet := map[clientmodel.LabelValue]bool{} for _, series := range s.fingerprintToSeries { - if value, ok := series.metric[labelName]; ok { + if value, ok := series.metric()[labelName]; ok { if !valueSet[value] { values = append(values, value) valueSet[value] = true @@ -460,7 +482,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage { return &memorySeriesStorage{ - fingerprintToSeries: make(map[clientmodel.Fingerprint]*stream), + fingerprintToSeries: make(map[clientmodel.Fingerprint]stream), labelPairToFingerprints: make(map[LabelPair]clientmodel.Fingerprints), labelNameToFingerprints: make(map[clientmodel.LabelName]clientmodel.Fingerprints), wmCache: o.WatermarkCache, diff --git a/storage/metric/memory_test.go b/storage/metric/memory_test.go index 2ddfa19425..a86a7424e2 100644 --- a/storage/metric/memory_test.go +++ b/storage/metric/memory_test.go @@ -24,12 +24,13 @@ import ( func BenchmarkStreamAdd(b *testing.B) { b.StopTimer() - s := newStream(clientmodel.Metric{}) - times := make([]time.Time, 0, b.N) - samples := make([]clientmodel.SampleValue, 0, b.N) + s := newArrayStream(clientmodel.Metric{}) + samples := make(Values, b.N) for i := 0; i < b.N; i++ { - times = append(times, time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC)) - samples = append(samples, clientmodel.SampleValue(i)) + samples = append(samples, &SamplePair{ + Timestamp: time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC), + Value: clientmodel.SampleValue(i), + }) } b.StartTimer() @@ -37,9 +38,7 @@ func BenchmarkStreamAdd(b *testing.B) { var pre runtime.MemStats runtime.ReadMemStats(&pre) - for i := 0; i < b.N; i++ { - s.add(times[i], samples[i]) - } + s.add(samples...) var post runtime.MemStats runtime.ReadMemStats(&post)