mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-12 16:44:05 -08:00
Change Get* methods to receive fingerprints instead of metrics.
This commit is contained in:
parent
20c5ca1d72
commit
6001d22f87
|
@ -30,21 +30,23 @@ type PersistenceAdapter struct {
|
||||||
// AST-global persistence to use.
|
// AST-global persistence to use.
|
||||||
var persistenceAdapter *PersistenceAdapter = nil
|
var persistenceAdapter *PersistenceAdapter = nil
|
||||||
|
|
||||||
func (p *PersistenceAdapter) getMetricsWithLabels(labels model.LabelSet) (metrics []model.Metric, err error) {
|
func (p *PersistenceAdapter) getMetricsWithLabels(labels model.LabelSet) (fingerprintToMetric map[model.Fingerprint]model.Metric, err error) {
|
||||||
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels)
|
fingerprints, err := p.persistence.GetFingerprintsForLabelSet(labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
fingerprintToMetric = make(map[model.Fingerprint]model.Metric)
|
||||||
for _, fingerprint := range fingerprints {
|
for _, fingerprint := range fingerprints {
|
||||||
metric, err := p.persistence.GetMetricForFingerprint(fingerprint)
|
var metric *model.Metric // Don't shadow err.
|
||||||
|
metric, err = p.persistence.GetMetricForFingerprint(fingerprint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metrics, err
|
return
|
||||||
}
|
}
|
||||||
if metric == nil {
|
if metric == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics = append(metrics, *metric)
|
fingerprintToMetric[fingerprint] = *metric
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -56,8 +58,8 @@ func (p *PersistenceAdapter) GetValueAtTime(labels model.LabelSet, timestamp *ti
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
samples := []*model.Sample{}
|
samples := []*model.Sample{}
|
||||||
for _, metric := range metrics {
|
for fingerprint := range metrics {
|
||||||
sample, err := p.persistence.GetValueAtTime(metric, *timestamp, *p.stalenessPolicy)
|
sample, err := p.persistence.GetValueAtTime(fingerprint, *timestamp, *p.stalenessPolicy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -76,9 +78,9 @@ func (p *PersistenceAdapter) GetBoundaryValues(labels model.LabelSet, interval *
|
||||||
}
|
}
|
||||||
|
|
||||||
sampleSets := []*model.SampleSet{}
|
sampleSets := []*model.SampleSet{}
|
||||||
for _, metric := range metrics {
|
for fingerprint, metric := range metrics {
|
||||||
// TODO: change to GetBoundaryValues() once it has the right return type.
|
// TODO: change to GetBoundaryValues() once it has the right return type.
|
||||||
sampleSet, err := p.persistence.GetRangeValues(metric, *interval)
|
sampleSet, err := p.persistence.GetRangeValues(fingerprint, *interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -100,8 +102,8 @@ func (p *PersistenceAdapter) GetRangeValues(labels model.LabelSet, interval *mod
|
||||||
}
|
}
|
||||||
|
|
||||||
sampleSets := []*model.SampleSet{}
|
sampleSets := []*model.SampleSet{}
|
||||||
for _, metric := range metrics {
|
for fingerprint, metric := range metrics {
|
||||||
sampleSet, err := p.persistence.GetRangeValues(metric, *interval)
|
sampleSet, err := p.persistence.GetRangeValues(fingerprint, *interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -271,7 +271,7 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
|
||||||
}
|
}
|
||||||
|
|
||||||
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||||
sample, err := p.GetValueAtTime(metric, time, StalenessPolicy{})
|
sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -334,7 +334,7 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
|
||||||
}
|
}
|
||||||
|
|
||||||
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
|
||||||
sample, err := p.GetValueAtTime(metric, time, StalenessPolicy{})
|
sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,9 +46,9 @@ type MetricPersistence interface {
|
||||||
|
|
||||||
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
|
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
|
||||||
|
|
||||||
GetValueAtTime(model.Metric, time.Time, StalenessPolicy) (*model.Sample, error)
|
GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error)
|
||||||
GetBoundaryValues(model.Metric, model.Interval, StalenessPolicy) (*model.Sample, *model.Sample, error)
|
GetBoundaryValues(model.Fingerprint, model.Interval, StalenessPolicy) (*model.Sample, *model.Sample, error)
|
||||||
GetRangeValues(model.Metric, model.Interval) (*model.SampleSet, error)
|
GetRangeValues(model.Fingerprint, model.Interval) (*model.SampleSet, error)
|
||||||
|
|
||||||
ForEachSample(IteratorsForFingerprintBuilder) (err error)
|
ForEachSample(IteratorsForFingerprintBuilder) (err error)
|
||||||
|
|
||||||
|
|
|
@ -900,7 +900,7 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LevelDBMetricPersistence) GetBoundaryValues(m model.Metric, i model.Interval, s StalenessPolicy) (open *model.Sample, end *model.Sample, err error) {
|
func (l *LevelDBMetricPersistence) GetBoundaryValues(fp model.Fingerprint, i model.Interval, s StalenessPolicy) (open *model.Sample, end *model.Sample, err error) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -910,14 +910,14 @@ func (l *LevelDBMetricPersistence) GetBoundaryValues(m model.Metric, i model.Int
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// XXX: Maybe we will want to emit incomplete sets?
|
// XXX: Maybe we will want to emit incomplete sets?
|
||||||
open, err = l.GetValueAtTime(m, i.OldestInclusive, s)
|
open, err = l.GetValueAtTime(fp, i.OldestInclusive, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
} else if open == nil {
|
} else if open == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
end, err = l.GetValueAtTime(m, i.NewestInclusive, s)
|
end, err = l.GetValueAtTime(fp, i.NewestInclusive, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
} else if end == nil {
|
} else if end == nil {
|
||||||
|
@ -949,7 +949,7 @@ type iterator interface {
|
||||||
Value() []byte
|
Value() []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) {
|
func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -958,11 +958,15 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s
|
||||||
recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
f := model.NewFingerprintFromMetric(m).ToDTO()
|
// TODO: memoize/cache this or change the return type to metric.SamplePair.
|
||||||
|
m, err := l.GetMetricForFingerprint(fp)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Candidate for Refactoring
|
// Candidate for Refactoring
|
||||||
k := &dto.SampleKey{
|
k := &dto.SampleKey{
|
||||||
Fingerprint: f,
|
Fingerprint: fp.ToDTO(),
|
||||||
Timestamp: indexable.EncodeTime(t),
|
Timestamp: indexable.EncodeTime(t),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1096,7 +1100,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sample = model.SampleFromDTO(&m, &t, firstValue)
|
sample = model.SampleFromDTO(m, &t, firstValue)
|
||||||
|
|
||||||
if firstDelta == time.Duration(0) {
|
if firstDelta == time.Duration(0) {
|
||||||
return
|
return
|
||||||
|
@ -1160,12 +1164,12 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(m model.Metric, t time.Time, s
|
||||||
sampleValue := &dto.SampleValueSeries{}
|
sampleValue := &dto.SampleValueSeries{}
|
||||||
sampleValue.Value = append(sampleValue.Value, &dto.SampleValueSeries_Value{Value: &interpolated})
|
sampleValue.Value = append(sampleValue.Value, &dto.SampleValueSeries_Value{Value: &interpolated})
|
||||||
|
|
||||||
sample = model.SampleFromDTO(&m, &t, sampleValue)
|
sample = model.SampleFromDTO(m, &t, sampleValue)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interval) (v *model.SampleSet, err error) {
|
func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.Interval) (v *model.SampleSet, err error) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -1173,10 +1177,9 @@ func (l *LevelDBMetricPersistence) GetRangeValues(m model.Metric, i model.Interv
|
||||||
|
|
||||||
recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
|
recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
|
||||||
}()
|
}()
|
||||||
f := model.NewFingerprintFromMetric(m).ToDTO()
|
|
||||||
|
|
||||||
k := &dto.SampleKey{
|
k := &dto.SampleKey{
|
||||||
Fingerprint: f,
|
Fingerprint: fp.ToDTO(),
|
||||||
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -209,9 +209,8 @@ func interpolateSample(x1, x2 time.Time, y1, y2 float32, e time.Time) model.Samp
|
||||||
return model.SampleValue(y1 + (offset * dDt))
|
return model.SampleValue(y1 + (offset * dDt))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s memorySeriesStorage) GetValueAtTime(m model.Metric, t time.Time, p StalenessPolicy) (sample *model.Sample, err error) {
|
func (s memorySeriesStorage) GetValueAtTime(fp model.Fingerprint, t time.Time, p StalenessPolicy) (sample *model.Sample, err error) {
|
||||||
fingerprint := model.NewFingerprintFromMetric(m)
|
series, ok := s.fingerprintToSeries[fp]
|
||||||
series, ok := s.fingerprintToSeries[fingerprint]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -225,7 +224,7 @@ func (s memorySeriesStorage) GetValueAtTime(m model.Metric, t time.Time, p Stale
|
||||||
if foundTime.Equal(t) {
|
if foundTime.Equal(t) {
|
||||||
value := iterator.Value().(value)
|
value := iterator.Value().(value)
|
||||||
sample = &model.Sample{
|
sample = &model.Sample{
|
||||||
Metric: m,
|
Metric: series.metric,
|
||||||
Value: value.get(),
|
Value: value.get(),
|
||||||
Timestamp: t,
|
Timestamp: t,
|
||||||
}
|
}
|
||||||
|
@ -242,7 +241,7 @@ func (s memorySeriesStorage) GetValueAtTime(m model.Metric, t time.Time, p Stale
|
||||||
|
|
||||||
if !iterator.Previous() {
|
if !iterator.Previous() {
|
||||||
sample = &model.Sample{
|
sample = &model.Sample{
|
||||||
Metric: m,
|
Metric: series.metric,
|
||||||
Value: iterator.Value().(value).get(),
|
Value: iterator.Value().(value).get(),
|
||||||
Timestamp: t,
|
Timestamp: t,
|
||||||
}
|
}
|
||||||
|
@ -261,7 +260,7 @@ func (s memorySeriesStorage) GetValueAtTime(m model.Metric, t time.Time, p Stale
|
||||||
firstValue := iterator.Value().(value).get()
|
firstValue := iterator.Value().(value).get()
|
||||||
|
|
||||||
sample = &model.Sample{
|
sample = &model.Sample{
|
||||||
Metric: m,
|
Metric: series.metric,
|
||||||
Value: interpolateSample(firstTime, secondTime, float32(firstValue), float32(secondValue), t),
|
Value: interpolateSample(firstTime, secondTime, float32(firstValue), float32(secondValue), t),
|
||||||
Timestamp: t,
|
Timestamp: t,
|
||||||
}
|
}
|
||||||
|
@ -269,15 +268,15 @@ func (s memorySeriesStorage) GetValueAtTime(m model.Metric, t time.Time, p Stale
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s memorySeriesStorage) GetBoundaryValues(m model.Metric, i model.Interval, p StalenessPolicy) (first *model.Sample, second *model.Sample, err error) {
|
func (s memorySeriesStorage) GetBoundaryValues(fp model.Fingerprint, i model.Interval, p StalenessPolicy) (first *model.Sample, second *model.Sample, err error) {
|
||||||
first, err = s.GetValueAtTime(m, i.OldestInclusive, p)
|
first, err = s.GetValueAtTime(fp, i.OldestInclusive, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
} else if first == nil {
|
} else if first == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
second, err = s.GetValueAtTime(m, i.NewestInclusive, p)
|
second, err = s.GetValueAtTime(fp, i.NewestInclusive, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
} else if second == nil {
|
} else if second == nil {
|
||||||
|
@ -287,15 +286,14 @@ func (s memorySeriesStorage) GetBoundaryValues(m model.Metric, i model.Interval,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s memorySeriesStorage) GetRangeValues(m model.Metric, i model.Interval) (samples *model.SampleSet, err error) {
|
func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interval) (samples *model.SampleSet, err error) {
|
||||||
fingerprint := model.NewFingerprintFromMetric(m)
|
series, ok := s.fingerprintToSeries[fp]
|
||||||
series, ok := s.fingerprintToSeries[fingerprint]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
samples = &model.SampleSet{
|
samples = &model.SampleSet{
|
||||||
Metric: m,
|
Metric: series.metric,
|
||||||
}
|
}
|
||||||
|
|
||||||
iterator := series.values.Seek(skipListTime(i.NewestInclusive))
|
iterator := series.values.Seek(skipListTime(i.NewestInclusive))
|
||||||
|
|
|
@ -585,7 +585,7 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, io.Closer),
|
||||||
DeltaAllowance: input.staleness,
|
DeltaAllowance: input.staleness,
|
||||||
}
|
}
|
||||||
|
|
||||||
actual, err := p.GetValueAtTime(m, time, sp)
|
actual, err := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time, sp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
|
t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
|
||||||
}
|
}
|
||||||
|
@ -1035,7 +1035,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, io.Close
|
||||||
DeltaAllowance: input.staleness,
|
DeltaAllowance: input.staleness,
|
||||||
}
|
}
|
||||||
|
|
||||||
openValue, endValue, err := p.GetBoundaryValues(m, interval, po)
|
openValue, endValue, err := p.GetBoundaryValues(model.NewFingerprintFromMetric(m), interval, po)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
|
t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
|
||||||
}
|
}
|
||||||
|
@ -1389,7 +1389,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, io.Closer),
|
||||||
NewestInclusive: end,
|
NewestInclusive: end,
|
||||||
}
|
}
|
||||||
|
|
||||||
values, err := p.GetRangeValues(m, in)
|
values, err := p.GetRangeValues(model.NewFingerprintFromMetric(m), in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
|
t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -413,7 +413,7 @@ func StochasticTests(persistenceMaker func() MetricPersistence, t test.Tester) {
|
||||||
NewestInclusive: time.Unix(end, 0),
|
NewestInclusive: time.Unix(end, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
samples, err := p.GetRangeValues(metric, interval)
|
samples, err := p.GetRangeValues(model.NewFingerprintFromMetric(metric), interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue