mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-27 13:42:31 -08:00
fd2158e746
Problem description: ==================== If a rule evaluation referencing a metric/timeseries M happens at a time when M doesn't have a memory timeseries yet, looking up the fingerprint for M (via TieredStorage.GetMetricForFingerprint()) will create a new Metric object for M which gets both: a) attached to a new empty memory timeseries (so we don't have to ask disk for the Metric's fingerprint next time), and b) returned to the rule evaluation layer. However, the rule evaluation layer replaces the name label (and possibly other labels) of the metric with the name of the recorded rule. Since both the rule evaluator and the memory storage share a reference to the same Metric object, the original memory timeseries will now also be incorrectly renamed. Fix: ==== Instead of storing a reference to a shared metric object, take a copy of the object when creating an empty memory timeseries for caching purposes. Change-Id: I9f2172696c16c10b377e6708553a46ef29390f1e
496 lines
12 KiB
Go
496 lines
12 KiB
Go
// Copyright 2013 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package metric
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model"
|
|
|
|
"github.com/prometheus/prometheus/utility"
|
|
)
|
|
|
|
// Assuming sample rate of 1 / 15Hz, this allows for one hour's worth of
|
|
// storage per metric without any major reallocations.
|
|
const initialSeriesArenaSize = 4 * 60
|
|
|
|
// Models a given sample entry stored in the in-memory arena.
|
|
type value interface {
|
|
// Gets the given value.
|
|
get() clientmodel.SampleValue
|
|
}
|
|
|
|
// Models a single sample value. It presumes that there is either no subsequent
|
|
// value seen or that any subsequent values are of a different value.
|
|
type singletonValue clientmodel.SampleValue
|
|
|
|
func (v singletonValue) get() clientmodel.SampleValue {
|
|
return clientmodel.SampleValue(v)
|
|
}
|
|
|
|
type stream interface {
|
|
add(...*SamplePair)
|
|
|
|
clone() Values
|
|
expunge(age clientmodel.Timestamp) Values
|
|
|
|
size() int
|
|
clear()
|
|
|
|
metric() clientmodel.Metric
|
|
|
|
getValueAtTime(t clientmodel.Timestamp) Values
|
|
getBoundaryValues(in Interval) Values
|
|
getRangeValues(in Interval) Values
|
|
}
|
|
|
|
type arrayStream struct {
|
|
sync.RWMutex
|
|
|
|
m clientmodel.Metric
|
|
values Values
|
|
}
|
|
|
|
func (s *arrayStream) metric() clientmodel.Metric {
|
|
return s.m
|
|
}
|
|
|
|
func (s *arrayStream) add(v ...*SamplePair) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
s.values = append(s.values, v...)
|
|
}
|
|
|
|
func (s *arrayStream) clone() Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
clone := make(Values, len(s.values))
|
|
copy(clone, s.values)
|
|
|
|
return clone
|
|
}
|
|
|
|
func (s *arrayStream) expunge(t clientmodel.Timestamp) Values {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
finder := func(i int) bool {
|
|
return s.values[i].Timestamp.After(t)
|
|
}
|
|
|
|
i := sort.Search(len(s.values), finder)
|
|
expunged := s.values[:i]
|
|
s.values = s.values[i:]
|
|
|
|
return expunged
|
|
}
|
|
|
|
func (s *arrayStream) getValueAtTime(t clientmodel.Timestamp) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
// BUG(all): May be avenues for simplification.
|
|
l := len(s.values)
|
|
switch l {
|
|
case 0:
|
|
return Values{}
|
|
case 1:
|
|
return Values{s.values[0]}
|
|
default:
|
|
index := sort.Search(l, func(i int) bool {
|
|
return !s.values[i].Timestamp.Before(t)
|
|
})
|
|
|
|
if index == 0 {
|
|
return Values{s.values[0]}
|
|
}
|
|
if index == l {
|
|
return Values{s.values[l-1]}
|
|
}
|
|
|
|
if s.values[index].Timestamp.Equal(t) {
|
|
return Values{s.values[index]}
|
|
}
|
|
return Values{s.values[index-1], s.values[index]}
|
|
}
|
|
}
|
|
|
|
func (s *arrayStream) getBoundaryValues(in Interval) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
oldest := sort.Search(len(s.values), func(i int) bool {
|
|
return !s.values[i].Timestamp.Before(in.OldestInclusive)
|
|
})
|
|
|
|
newest := sort.Search(len(s.values), func(i int) bool {
|
|
return s.values[i].Timestamp.After(in.NewestInclusive)
|
|
})
|
|
|
|
resultRange := s.values[oldest:newest]
|
|
switch len(resultRange) {
|
|
case 0:
|
|
return Values{}
|
|
case 1:
|
|
return Values{resultRange[0]}
|
|
default:
|
|
return Values{resultRange[0], resultRange[len(resultRange)-1]}
|
|
}
|
|
}
|
|
|
|
func (s *arrayStream) getRangeValues(in Interval) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
oldest := sort.Search(len(s.values), func(i int) bool {
|
|
return !s.values[i].Timestamp.Before(in.OldestInclusive)
|
|
})
|
|
|
|
newest := sort.Search(len(s.values), func(i int) bool {
|
|
return s.values[i].Timestamp.After(in.NewestInclusive)
|
|
})
|
|
|
|
result := make(Values, newest-oldest)
|
|
copy(result, s.values[oldest:newest])
|
|
|
|
return result
|
|
}
|
|
|
|
func (s *arrayStream) size() int {
|
|
return len(s.values)
|
|
}
|
|
|
|
func (s *arrayStream) clear() {
|
|
s.values = Values{}
|
|
}
|
|
|
|
func newArrayStream(metric clientmodel.Metric) *arrayStream {
|
|
return &arrayStream{
|
|
m: metric,
|
|
values: make(Values, 0, initialSeriesArenaSize),
|
|
}
|
|
}
|
|
|
|
type memorySeriesStorage struct {
|
|
sync.RWMutex
|
|
|
|
wmCache *watermarkCache
|
|
fingerprintToSeries map[clientmodel.Fingerprint]stream
|
|
labelPairToFingerprints map[LabelPair]clientmodel.Fingerprints
|
|
labelNameToFingerprints map[clientmodel.LabelName]clientmodel.Fingerprints
|
|
}
|
|
|
|
type MemorySeriesOptions struct {
|
|
// If provided, this WatermarkCache will be updated for any samples that are
|
|
// appended to the memorySeriesStorage.
|
|
WatermarkCache *watermarkCache
|
|
}
|
|
|
|
func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) error {
|
|
for _, sample := range samples {
|
|
s.AppendSample(sample)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *memorySeriesStorage) AppendSample(sample *clientmodel.Sample) error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
fingerprint := &clientmodel.Fingerprint{}
|
|
fingerprint.LoadFromMetric(sample.Metric)
|
|
series := s.getOrCreateSeries(sample.Metric, fingerprint)
|
|
series.add(&SamplePair{
|
|
Value: sample.Value,
|
|
Timestamp: sample.Timestamp,
|
|
})
|
|
|
|
if s.wmCache != nil {
|
|
s.wmCache.Put(fingerprint, &watermarks{High: sample.Timestamp})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *memorySeriesStorage) CreateEmptySeries(metric clientmodel.Metric) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
m := clientmodel.Metric{}
|
|
for label, value := range metric {
|
|
m[label] = value
|
|
}
|
|
|
|
fingerprint := &clientmodel.Fingerprint{}
|
|
fingerprint.LoadFromMetric(m)
|
|
s.getOrCreateSeries(m, fingerprint)
|
|
}
|
|
|
|
func (s *memorySeriesStorage) getOrCreateSeries(metric clientmodel.Metric, fingerprint *clientmodel.Fingerprint) stream {
|
|
series, ok := s.fingerprintToSeries[*fingerprint]
|
|
|
|
if !ok {
|
|
series = newArrayStream(metric)
|
|
s.fingerprintToSeries[*fingerprint] = series
|
|
|
|
for k, v := range metric {
|
|
labelPair := LabelPair{
|
|
Name: k,
|
|
Value: v,
|
|
}
|
|
labelPairValues := s.labelPairToFingerprints[labelPair]
|
|
labelPairValues = append(labelPairValues, fingerprint)
|
|
s.labelPairToFingerprints[labelPair] = labelPairValues
|
|
|
|
labelNameValues := s.labelNameToFingerprints[k]
|
|
labelNameValues = append(labelNameValues, fingerprint)
|
|
s.labelNameToFingerprints[k] = labelNameValues
|
|
}
|
|
}
|
|
return series
|
|
}
|
|
|
|
func (s *memorySeriesStorage) Flush(flushOlderThan clientmodel.Timestamp, queue chan<- clientmodel.Samples) {
|
|
emptySeries := []clientmodel.Fingerprint{}
|
|
|
|
s.RLock()
|
|
for fingerprint, stream := range s.fingerprintToSeries {
|
|
toArchive := stream.expunge(flushOlderThan)
|
|
queued := make(clientmodel.Samples, 0, len(toArchive))
|
|
// NOTE: This duplication will go away soon.
|
|
for _, value := range toArchive {
|
|
queued = append(queued, &clientmodel.Sample{
|
|
Metric: stream.metric(),
|
|
Timestamp: value.Timestamp,
|
|
Value: value.Value,
|
|
})
|
|
}
|
|
|
|
// BUG(all): this can deadlock if the queue is full, as we only ever clear
|
|
// the queue after calling this method:
|
|
// https://github.com/prometheus/prometheus/issues/275
|
|
queue <- queued
|
|
|
|
if stream.size() == 0 {
|
|
emptySeries = append(emptySeries, fingerprint)
|
|
}
|
|
}
|
|
s.RUnlock()
|
|
|
|
for _, fingerprint := range emptySeries {
|
|
if series, ok := s.fingerprintToSeries[fingerprint]; ok && series.size() == 0 {
|
|
s.Lock()
|
|
s.dropSeries(&fingerprint)
|
|
s.Unlock()
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
// Drop all references to a series, including any samples.
|
|
func (s *memorySeriesStorage) dropSeries(fingerprint *clientmodel.Fingerprint) {
|
|
series, ok := s.fingerprintToSeries[*fingerprint]
|
|
if !ok {
|
|
return
|
|
}
|
|
for k, v := range series.metric() {
|
|
labelPair := LabelPair{
|
|
Name: k,
|
|
Value: v,
|
|
}
|
|
delete(s.labelPairToFingerprints, labelPair)
|
|
delete(s.labelNameToFingerprints, k)
|
|
}
|
|
delete(s.fingerprintToSeries, *fingerprint)
|
|
}
|
|
|
|
// Append raw samples, bypassing indexing. Only used to add data to views,
|
|
// which don't need to lookup by metric.
|
|
func (s *memorySeriesStorage) appendSamplesWithoutIndexing(fingerprint *clientmodel.Fingerprint, samples Values) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
series, ok := s.fingerprintToSeries[*fingerprint]
|
|
|
|
if !ok {
|
|
series = newArrayStream(clientmodel.Metric{})
|
|
s.fingerprintToSeries[*fingerprint] = series
|
|
}
|
|
|
|
series.add(samples...)
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetFingerprintsForLabelSet(l clientmodel.LabelSet) (fingerprints clientmodel.Fingerprints, err error) {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
sets := []utility.Set{}
|
|
for k, v := range l {
|
|
values := s.labelPairToFingerprints[LabelPair{
|
|
Name: k,
|
|
Value: v,
|
|
}]
|
|
set := utility.Set{}
|
|
for _, fingerprint := range values {
|
|
set.Add(*fingerprint)
|
|
}
|
|
sets = append(sets, set)
|
|
}
|
|
|
|
setCount := len(sets)
|
|
if setCount == 0 {
|
|
return fingerprints, nil
|
|
}
|
|
|
|
base := sets[0]
|
|
for i := 1; i < setCount; i++ {
|
|
base = base.Intersection(sets[i])
|
|
}
|
|
for _, e := range base.Elements() {
|
|
fingerprint := e.(clientmodel.Fingerprint)
|
|
fingerprints = append(fingerprints, &fingerprint)
|
|
}
|
|
|
|
return fingerprints, nil
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetFingerprintsForLabelName(l clientmodel.LabelName) (clientmodel.Fingerprints, error) {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
values, ok := s.labelNameToFingerprints[l]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
fingerprints := make(clientmodel.Fingerprints, len(values))
|
|
copy(fingerprints, values)
|
|
|
|
return fingerprints, nil
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetMetricForFingerprint(f *clientmodel.Fingerprint) (clientmodel.Metric, error) {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
series, ok := s.fingerprintToSeries[*f]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
metric := clientmodel.Metric{}
|
|
for label, value := range series.metric() {
|
|
metric[label] = value
|
|
}
|
|
|
|
return metric, nil
|
|
}
|
|
|
|
func (s *memorySeriesStorage) HasFingerprint(f *clientmodel.Fingerprint) bool {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
_, has := s.fingerprintToSeries[*f]
|
|
|
|
return has
|
|
}
|
|
|
|
func (s *memorySeriesStorage) CloneSamples(f *clientmodel.Fingerprint) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
series, ok := s.fingerprintToSeries[*f]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return series.clone()
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetValueAtTime(f *clientmodel.Fingerprint, t clientmodel.Timestamp) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
series, ok := s.fingerprintToSeries[*f]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return series.getValueAtTime(t)
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetBoundaryValues(f *clientmodel.Fingerprint, i Interval) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
series, ok := s.fingerprintToSeries[*f]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return series.getBoundaryValues(i)
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetRangeValues(f *clientmodel.Fingerprint, i Interval) Values {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
series, ok := s.fingerprintToSeries[*f]
|
|
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return series.getRangeValues(i)
|
|
}
|
|
|
|
func (s *memorySeriesStorage) Close() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
s.fingerprintToSeries = map[clientmodel.Fingerprint]stream{}
|
|
s.labelPairToFingerprints = map[LabelPair]clientmodel.Fingerprints{}
|
|
s.labelNameToFingerprints = map[clientmodel.LabelName]clientmodel.Fingerprints{}
|
|
}
|
|
|
|
func (s *memorySeriesStorage) GetAllValuesForLabel(labelName clientmodel.LabelName) (values clientmodel.LabelValues, err error) {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
|
|
valueSet := map[clientmodel.LabelValue]bool{}
|
|
for _, series := range s.fingerprintToSeries {
|
|
if value, ok := series.metric()[labelName]; ok {
|
|
if !valueSet[value] {
|
|
values = append(values, value)
|
|
valueSet[value] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func NewMemorySeriesStorage(o MemorySeriesOptions) *memorySeriesStorage {
|
|
return &memorySeriesStorage{
|
|
fingerprintToSeries: make(map[clientmodel.Fingerprint]stream),
|
|
labelPairToFingerprints: make(map[LabelPair]clientmodel.Fingerprints),
|
|
labelNameToFingerprints: make(map[clientmodel.LabelName]clientmodel.Fingerprints),
|
|
wmCache: o.WatermarkCache,
|
|
}
|
|
}
|