mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Interfacification of stream.
Move the stream to an interface, for a number of additional changes around it are underway. Conflicts: storage/metric/memory.go Change-Id: I4a5fc176f4a5274a64ebdb1cad52600954c463c3
This commit is contained in:
parent
c262907fec
commit
d74c2c54d4
|
@ -41,38 +41,66 @@ func (v singletonValue) get() clientmodel.SampleValue {
|
||||||
return clientmodel.SampleValue(v)
|
return clientmodel.SampleValue(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
type stream struct {
|
type stream interface {
|
||||||
|
add(...*SamplePair)
|
||||||
|
|
||||||
|
clone() Values
|
||||||
|
expunge(age time.Time) Values
|
||||||
|
|
||||||
|
size() int
|
||||||
|
clear()
|
||||||
|
|
||||||
|
metric() clientmodel.Metric
|
||||||
|
|
||||||
|
getValueAtTime(t time.Time) Values
|
||||||
|
getBoundaryValues(in Interval) Values
|
||||||
|
getRangeValues(in Interval) Values
|
||||||
|
}
|
||||||
|
|
||||||
|
type arrayStream struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
|
||||||
metric clientmodel.Metric
|
m clientmodel.Metric
|
||||||
values Values
|
values Values
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) add(timestamp time.Time, value clientmodel.SampleValue) {
|
func (s *arrayStream) metric() clientmodel.Metric {
|
||||||
|
return s.m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *arrayStream) add(v ...*SamplePair) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
// BUG(all): https://github.com/prometheus/prometheus/pull/265/files#r4336435.
|
s.values = append(s.values, v...)
|
||||||
|
|
||||||
s.values = append(s.values, &SamplePair{
|
|
||||||
Timestamp: timestamp.Round(time.Second).UTC(),
|
|
||||||
Value: value,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) clone() Values {
|
func (s *arrayStream) clone() Values {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
|
|
||||||
// BUG(all): Examine COW technique.
|
|
||||||
|
|
||||||
clone := make(Values, len(s.values))
|
clone := make(Values, len(s.values))
|
||||||
copy(clone, s.values)
|
copy(clone, s.values)
|
||||||
|
|
||||||
return clone
|
return clone
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) getValueAtTime(t time.Time) Values {
|
func (s *arrayStream) expunge(t time.Time) Values {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
|
||||||
|
finder := func(i int) bool {
|
||||||
|
return s.values[i].Timestamp.After(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
i := sort.Search(len(s.values), finder)
|
||||||
|
expunged := s.values[:i]
|
||||||
|
s.values = s.values[i:]
|
||||||
|
|
||||||
|
return expunged
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *arrayStream) getValueAtTime(t time.Time) Values {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
|
|
||||||
|
@ -102,7 +130,7 @@ func (s *stream) getValueAtTime(t time.Time) Values {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) getBoundaryValues(in Interval) Values {
|
func (s *arrayStream) getBoundaryValues(in Interval) Values {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
|
|
||||||
|
@ -125,7 +153,7 @@ func (s *stream) getBoundaryValues(in Interval) Values {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) getRangeValues(in Interval) Values {
|
func (s *arrayStream) getRangeValues(in Interval) Values {
|
||||||
s.RLock()
|
s.RLock()
|
||||||
defer s.RUnlock()
|
defer s.RUnlock()
|
||||||
|
|
||||||
|
@ -143,13 +171,17 @@ func (s *stream) getRangeValues(in Interval) Values {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stream) empty() bool {
|
func (s *arrayStream) size() int {
|
||||||
return len(s.values) == 0
|
return len(s.values)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStream(metric clientmodel.Metric) *stream {
|
func (s *arrayStream) clear() {
|
||||||
return &stream{
|
s.values = Values{}
|
||||||
metric: metric,
|
}
|
||||||
|
|
||||||
|
func newArrayStream(metric clientmodel.Metric) *arrayStream {
|
||||||
|
return &arrayStream{
|
||||||
|
m: metric,
|
||||||
values: make(Values, 0, initialSeriesArenaSize),
|
values: make(Values, 0, initialSeriesArenaSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,7 +190,7 @@ type memorySeriesStorage struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
|
||||||
wmCache *WatermarkCache
|
wmCache *WatermarkCache
|
||||||
fingerprintToSeries map[clientmodel.Fingerprint]*stream
|
fingerprintToSeries map[clientmodel.Fingerprint]stream
|
||||||
labelPairToFingerprints map[LabelPair]clientmodel.Fingerprints
|
labelPairToFingerprints map[LabelPair]clientmodel.Fingerprints
|
||||||
labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints
|
labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints
|
||||||
}
|
}
|
||||||
|
@ -184,7 +216,10 @@ func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error {
|
||||||
fingerprint := &clientmodel.Fingerprint{}
|
fingerprint := &clientmodel.Fingerprint{}
|
||||||
fingerprint.LoadFromMetric(sample.Metric)
|
fingerprint.LoadFromMetric(sample.Metric)
|
||||||
series := s.getOrCreateSeries(sample.Metric, fingerprint)
|
series := s.getOrCreateSeries(sample.Metric, fingerprint)
|
||||||
series.add(sample.Timestamp, sample.Value)
|
series.add(&SamplePair{
|
||||||
|
Value: sample.Value,
|
||||||
|
Timestamp: sample.Timestamp,
|
||||||
|
})
|
||||||
|
|
||||||
if s.wmCache != nil {
|
if s.wmCache != nil {
|
||||||
s.wmCache.Set(fingerprint, &watermarks{High: sample.Timestamp})
|
s.wmCache.Set(fingerprint, &watermarks{High: sample.Timestamp})
|
||||||
|
@ -202,11 +237,11 @@ func (s *memorySeriesStorage) CreateEmptySeries(metric clientmodel.Metric) {
|
||||||
s.getOrCreateSeries(metric, fingerprint)
|
s.getOrCreateSeries(metric, fingerprint)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, fingerprint *clientmodel.Fingerprint) *stream {
|
func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, fingerprint *clientmodel.Fingerprint) stream {
|
||||||
series, ok := s.fingerprintToSeries[*fingerprint]
|
series, ok := s.fingerprintToSeries[*fingerprint]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
series = newStream(metric)
|
series = newArrayStream(metric)
|
||||||
s.fingerprintToSeries[*fingerprint] = series
|
s.fingerprintToSeries[*fingerprint] = series
|
||||||
|
|
||||||
for k, v := range metric {
|
for k, v := range metric {
|
||||||
|
@ -231,20 +266,12 @@ func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clien
|
||||||
|
|
||||||
s.RLock()
|
s.RLock()
|
||||||
for fingerprint, stream := range s.fingerprintToSeries {
|
for fingerprint, stream := range s.fingerprintToSeries {
|
||||||
finder := func(i int) bool {
|
toArchive := stream.expunge(flushOlderThan)
|
||||||
return stream.values[i].Timestamp.After(flushOlderThan)
|
|
||||||
}
|
|
||||||
|
|
||||||
stream.Lock()
|
|
||||||
|
|
||||||
i := sort.Search(len(stream.values), finder)
|
|
||||||
toArchive := stream.values[:i]
|
|
||||||
toKeep := stream.values[i:]
|
|
||||||
queued := make(clientmodel.Samples, 0, len(toArchive))
|
queued := make(clientmodel.Samples, 0, len(toArchive))
|
||||||
|
// NOTE: This duplication will go away soon.
|
||||||
for _, value := range toArchive {
|
for _, value := range toArchive {
|
||||||
queued = append(queued, &clientmodel.Sample{
|
queued = append(queued, &clientmodel.Sample{
|
||||||
Metric: stream.metric,
|
Metric: stream.metric(),
|
||||||
Timestamp: value.Timestamp,
|
Timestamp: value.Timestamp,
|
||||||
Value: value.Value,
|
Value: value.Value,
|
||||||
})
|
})
|
||||||
|
@ -255,18 +282,15 @@ func (s *memorySeriesStorage) Flush(flushOlderThan time.Time, queue chan<- clien
|
||||||
// https://github.com/prometheus/prometheus/issues/275
|
// https://github.com/prometheus/prometheus/issues/275
|
||||||
queue <- queued
|
queue <- queued
|
||||||
|
|
||||||
stream.values = toKeep
|
if stream.size() == 0 {
|
||||||
|
|
||||||
if len(toKeep) == 0 {
|
|
||||||
emptySeries = append(emptySeries, fingerprint)
|
emptySeries = append(emptySeries, fingerprint)
|
||||||
}
|
}
|
||||||
stream.Unlock()
|
|
||||||
}
|
}
|
||||||
s.RUnlock()
|
s.RUnlock()
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
for _, fingerprint := range emptySeries {
|
for _, fingerprint := range emptySeries {
|
||||||
if s.fingerprintToSeries[fingerprint].empty() {
|
if s.fingerprintToSeries[fingerprint].size() == 0 {
|
||||||
s.dropSeries(&fingerprint)
|
s.dropSeries(&fingerprint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,7 +303,7 @@ func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for k, v := range series.metric {
|
for k, v := range series.metric() {
|
||||||
labelPair := LabelPair{
|
labelPair := LabelPair{
|
||||||
Name: k,
|
Name: k,
|
||||||
Value: v,
|
Value: v,
|
||||||
|
@ -299,13 +323,11 @@ func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmo
|
||||||
series, ok := s.fingerprintToSeries[*fingerprint]
|
series, ok := s.fingerprintToSeries[*fingerprint]
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
series = newStream(clientmodel.Metric{})
|
series = newArrayStream(clientmodel.Metric{})
|
||||||
s.fingerprintToSeries[*fingerprint] = series
|
s.fingerprintToSeries[*fingerprint] = series
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sample := range samples {
|
series.add(samples...)
|
||||||
series.add(sample.Timestamp, sample.Value)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l clientmodel.LabelSet) (fingerprints clientmodel.Fingerprints, err error) {
|
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l clientmodel.LabelSet) (fingerprints clientmodel.Fingerprints, err error) {
|
||||||
|
@ -367,7 +389,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint
|
||||||
}
|
}
|
||||||
|
|
||||||
metric := clientmodel.Metric{}
|
metric := clientmodel.Metric{}
|
||||||
for label, value := range series.metric {
|
for label, value := range series.metric() {
|
||||||
metric[label] = value
|
metric[label] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -436,7 +458,7 @@ func (s *memorySeriesStorage) Close() {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
s.fingerprintToSeries = map[clientmodel.Fingerprint]*stream{}
|
s.fingerprintToSeries = map[clientmodel.Fingerprint]stream{}
|
||||||
s.labelPairToFingerprints = map[LabelPair]clientmodel.Fingerprints{}
|
s.labelPairToFingerprints = map[LabelPair]clientmodel.Fingerprints{}
|
||||||
s.labelNameToFingerprints = map[clientmodel.LabelName]clientmodel.Fingerprints{}
|
s.labelNameToFingerprints = map[clientmodel.LabelName]clientmodel.Fingerprints{}
|
||||||
}
|
}
|
||||||
|
@ -447,7 +469,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa
|
||||||
|
|
||||||
valueSet := map[clientmodel.LabelValue]bool{}
|
valueSet := map[clientmodel.LabelValue]bool{}
|
||||||
for _, series := range s.fingerprintToSeries {
|
for _, series := range s.fingerprintToSeries {
|
||||||
if value, ok := series.metric[labelName]; ok {
|
if value, ok := series.metric()[labelName]; ok {
|
||||||
if !valueSet[value] {
|
if !valueSet[value] {
|
||||||
values = append(values, value)
|
values = append(values, value)
|
||||||
valueSet[value] = true
|
valueSet[value] = true
|
||||||
|
@ -460,7 +482,7 @@ func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelNa
|
||||||
|
|
||||||
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
|
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
|
||||||
return &memorySeriesStorage{
|
return &memorySeriesStorage{
|
||||||
fingerprintToSeries: make(map[clientmodel.Fingerprint]*stream),
|
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),
|
||||||
labelPairToFingerprints: make(map[LabelPair]clientmodel.Fingerprints),
|
labelPairToFingerprints: make(map[LabelPair]clientmodel.Fingerprints),
|
||||||
labelNameToFingerprints: make(map[clientmodel.LabelName]clientmodel.Fingerprints),
|
labelNameToFingerprints: make(map[clientmodel.LabelName]clientmodel.Fingerprints),
|
||||||
wmCache: o.WatermarkCache,
|
wmCache: o.WatermarkCache,
|
||||||
|
|
|
@ -24,12 +24,13 @@ import (
|
||||||
|
|
||||||
func BenchmarkStreamAdd(b *testing.B) {
|
func BenchmarkStreamAdd(b *testing.B) {
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
s := newStream(clientmodel.Metric{})
|
s := newArrayStream(clientmodel.Metric{})
|
||||||
times := make([]time.Time, 0, b.N)
|
samples := make(Values, b.N)
|
||||||
samples := make([]clientmodel.SampleValue, 0, b.N)
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
times = append(times, time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC))
|
samples = append(samples, &SamplePair{
|
||||||
samples = append(samples, clientmodel.SampleValue(i))
|
Timestamp: time.Date(i, 0, 0, 0, 0, 0, 0, time.UTC),
|
||||||
|
Value: clientmodel.SampleValue(i),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
@ -37,9 +38,7 @@ func BenchmarkStreamAdd(b *testing.B) {
|
||||||
var pre runtime.MemStats
|
var pre runtime.MemStats
|
||||||
runtime.ReadMemStats(&pre)
|
runtime.ReadMemStats(&pre)
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
s.add(samples...)
|
||||||
s.add(times[i], samples[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
var post runtime.MemStats
|
var post runtime.MemStats
|
||||||
runtime.ReadMemStats(&post)
|
runtime.ReadMemStats(&post)
|
||||||
|
|
Loading…
Reference in a new issue