Merge pull request #160 from prometheus/julius-refactor-persistence

Make view use memory persistence, remove obsolete code.
This commit is contained in:
juliusv 2013-04-18 15:18:30 -07:00
commit 30c7acfaa4
9 changed files with 364 additions and 1196 deletions

View file

@ -26,9 +26,16 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal
// (i.e. metric->fingerprint lookups). // (i.e. metric->fingerprint lookups).
var queryStorage metric.Storage = nil var queryStorage metric.Storage = nil
// Describes the lenience limits to apply to values from the materialized view.
type StalenessPolicy struct {
// Describes the inclusive limit at which individual points if requested will
// be matched and subject to interpolation.
DeltaAllowance time.Duration
}
type viewAdapter struct { type viewAdapter struct {
view metric.View view metric.View
stalenessPolicy *metric.StalenessPolicy stalenessPolicy StalenessPolicy
} }
// interpolateSamples interpolates a value at a target time between two // interpolateSamples interpolates a value at a target time between two
@ -165,12 +172,12 @@ func SetStorage(storage metric.Storage) {
} }
func NewViewAdapter(view metric.View) *viewAdapter { func NewViewAdapter(view metric.View) *viewAdapter {
stalenessPolicy := metric.StalenessPolicy{ stalenessPolicy := StalenessPolicy{
DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second,
} }
return &viewAdapter{ return &viewAdapter{
view: view, view: view,
stalenessPolicy: &stalenessPolicy, stalenessPolicy: stalenessPolicy,
} }
} }

View file

@ -271,21 +271,20 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) {
} }
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{}) samples := p.GetValueAtTime(fingerprints[0], time)
if err != nil { if len(samples) == 0 {
t.Fatal(err) t.Fatal("expected at least one sample.")
}
if sample == nil {
t.Fatal("expected non-nil sample.")
} }
expected := model.SampleValue(i) expected := model.SampleValue(i)
for _, sample := range samples {
if sample.Value != expected { if sample.Value != expected {
t.Fatalf("expected %d value, got %d", expected, sample.Value) t.Fatalf("expected %d value, got %d", expected, sample.Value)
} }
} }
} }
}
} }
func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) { func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
@ -334,21 +333,20 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) {
} }
time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second)
sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{}) samples := p.GetValueAtTime(fingerprints[0], time)
if err != nil { if len(samples) == 0 {
t.Fatal(err) t.Fatal("expected at least one sample.")
}
if sample == nil {
t.Fatal("expected non-nil sample.")
} }
expected := model.SampleValue(i) expected := model.SampleValue(i)
for _, sample := range samples {
if sample.Value != expected { if sample.Value != expected {
t.Fatalf("expected %d value, got %d", expected, sample.Value) t.Fatalf("expected %d value, got %d", expected, sample.Value)
} }
} }
} }
}
} }
// Test Definitions Below // Test Definitions Below

View file

@ -47,9 +47,9 @@ type MetricPersistence interface {
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.
GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error) GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error)
GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error) GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair
GetBoundaryValues(model.Fingerprint, model.Interval, StalenessPolicy) (*model.Sample, *model.Sample, error) GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair)
GetRangeValues(model.Fingerprint, model.Interval) (*model.SampleSet, error) GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair
ForEachSample(IteratorsForFingerprintBuilder) (err error) ForEachSample(IteratorsForFingerprintBuilder) (err error)
@ -61,13 +61,6 @@ type MetricPersistence interface {
// MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error) // MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error)
} }
// Describes the lenience limits for querying the materialized View.
type StalenessPolicy struct {
// Describes the inclusive limit at which individual points if requested will
// be matched and subject to interpolation.
DeltaAllowance time.Duration
}
// View provides view of the values in the datastore subject to the request of a // View provides view of the values in the datastore subject to the request of a
// preloading operation. // preloading operation.
type View interface { type View interface {

View file

@ -862,322 +862,16 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint)
return return
} }
func (l *LevelDBMetricPersistence) GetBoundaryValues(fp model.Fingerprint, i model.Interval, s StalenessPolicy) (open *model.Sample, end *model.Sample, err error) { func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) {
begin := time.Now() panic("Not implemented")
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure})
}()
// XXX: Maybe we will want to emit incomplete sets?
open, err = l.GetValueAtTime(fp, i.OldestInclusive, s)
if err != nil {
return
} else if open == nil {
return
}
end, err = l.GetValueAtTime(fp, i.NewestInclusive, s)
if err != nil {
return
} else if end == nil {
open = nil
}
return
} }
func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) {
yDelta := y2 - y1 panic("Not implemented")
xDelta := x2.Sub(x1)
dDt := yDelta / float32(xDelta)
offset := float32(e.Sub(x1))
return y1 + (offset * dDt)
} }
func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) { func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) {
begin := time.Now() panic("Not implemented")
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure})
}()
// TODO: memoize/cache this or change the return type to metric.SamplePair.
m, err := l.GetMetricForFingerprint(fp)
if err != nil {
return
}
// Candidate for Refactoring
k := &dto.SampleKey{
Fingerprint: fp.ToDTO(),
Timestamp: indexable.EncodeTime(t),
}
e, err := coding.NewProtocolBuffer(k).Encode()
if err != nil {
return
}
iterator := l.metricSamples.NewIterator(true)
defer iterator.Close()
if !iterator.Seek(e) {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
* 2.) Key seek after AND outside known range.
*
* Once a LevelDB iterator goes invalid, it cannot be recovered; thusly,
* we need to create a new in order to check if the last value in the
* database is sufficient for our purposes. This is, in all reality, a
* corner case but one that could bring down the system.
*/
iterator = l.metricSamples.NewIterator(true)
defer iterator.Close()
if !iterator.SeekToLast() {
/*
* For whatever reason, the LevelDB cannot be recovered.
*/
return
}
}
var (
firstKey *dto.SampleKey
firstValue *dto.SampleValueSeries
)
firstKey, err = extractSampleKey(iterator)
if err != nil {
return
}
peekAhead := false
if !fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) {
/*
* This allows us to grab values for metrics if our request time is after
* the last recorded time subject to the staleness policy due to the nuances
* of LevelDB storage:
*
* # Assumptions:
* - K0 < K1 in terms of sorting.
* - T0 < T1 in terms of sorting.
*
* # Data
*
* K0-T0
* K0-T1
* K0-T2
* K1-T0
* K1-T1
*
* # Scenario
* K0-T3, which does not exist, is requested. LevelDB will thusly seek to
* K1-T1, when K0-T2 exists as a perfectly good candidate to check subject
* to the provided staleness policy and such.
*/
peekAhead = true
}
firstTime := indexable.DecodeTime(firstKey.Timestamp)
if t.Before(firstTime) || peekAhead {
if !iterator.Previous() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
* 2.) Key seek before AND outside known range.
*
* This is an explicit validation to ensure that if no previous values for
* the series are found, the query aborts.
*/
return
}
var (
alternativeKey *dto.SampleKey
alternativeValue *dto.SampleValueSeries
)
alternativeKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if !fingerprintsEqual(alternativeKey.Fingerprint, k.Fingerprint) {
return
}
/*
* At this point, we found a previous value in the same series in the
* database. LevelDB originally seeked to the subsequent element given
* the key, but we need to consider this adjacency instead.
*/
alternativeTime := indexable.DecodeTime(alternativeKey.Timestamp)
firstKey = alternativeKey
firstValue = alternativeValue
firstTime = alternativeTime
}
firstDelta := firstTime.Sub(t)
if firstDelta < 0 {
firstDelta *= -1
}
if firstDelta > s.DeltaAllowance {
return
}
firstValue, err = extractSampleValues(iterator)
if err != nil {
return
}
sample = model.SampleFromDTO(m, &t, firstValue)
if firstDelta == time.Duration(0) {
return
}
if !iterator.Next() {
/*
* Two cases for this:
* 1.) Corruption in LevelDB.
* 2.) Key seek after AND outside known range.
*
* This means that there are no more values left in the storage; and if this
* point is reached, we know that the one that has been found is within the
* allowed staleness limits.
*/
return
}
var secondKey *dto.SampleKey
secondKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if !fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) {
return
} else {
/*
* At this point, current entry in the database has the same key as the
* previous. For this reason, the validation logic will expect that the
* distance between the two points shall not exceed the staleness policy
* allowed limit to reduce interpolation errors.
*
* For this reason, the sample is reset in case of other subsequent
* validation behaviors.
*/
sample = nil
}
secondTime := indexable.DecodeTime(secondKey.Timestamp)
totalDelta := secondTime.Sub(firstTime)
if totalDelta > s.DeltaAllowance {
return
}
var secondValue *dto.SampleValueSeries
secondValue, err = extractSampleValues(iterator)
if err != nil {
return
}
fValue := *firstValue.Value[0].Value
sValue := *secondValue.Value[0].Value
interpolated := interpolate(firstTime, secondTime, fValue, sValue, t)
sampleValue := &dto.SampleValueSeries{}
sampleValue.Value = append(sampleValue.Value, &dto.SampleValueSeries_Value{Value: &interpolated})
sample = model.SampleFromDTO(m, &t, sampleValue)
return
}
func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.Interval) (v *model.SampleSet, err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}()
k := &dto.SampleKey{
Fingerprint: fp.ToDTO(),
Timestamp: indexable.EncodeTime(i.OldestInclusive),
}
e, err := coding.NewProtocolBuffer(k).Encode()
if err != nil {
return
}
iterator := l.metricSamples.NewIterator(true)
defer iterator.Close()
predicate := keyIsOlderThan(i.NewestInclusive)
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
retrievedKey := &dto.SampleKey{}
retrievedKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if predicate(retrievedKey) {
break
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
break
}
retrievedValue, err := extractSampleValues(iterator)
if err != nil {
return nil, err
}
if v == nil {
// TODO: memoize/cache this or change the return type to metric.SamplePair.
m, err := l.GetMetricForFingerprint(fp)
if err != nil {
return v, err
}
v = &model.SampleSet{
Metric: *m,
}
}
v.Values = append(v.Values, model.SamplePair{
Value: model.SampleValue(*retrievedValue.Value[0].Value),
Timestamp: indexable.DecodeTime(retrievedKey.Timestamp),
})
}
// XXX: We should not explicitly sort here but rather rely on the datastore.
// This adds appreciable overhead.
if v != nil {
sort.Sort(v.Values)
}
return
} }
type MetricKeyDecoder struct{} type MetricKeyDecoder struct{}

View file

@ -19,7 +19,6 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/utility" "github.com/prometheus/prometheus/utility"
"github.com/ryszard/goskiplist/skiplist" "github.com/ryszard/goskiplist/skiplist"
"sort"
"time" "time"
) )
@ -148,6 +147,19 @@ func (s memorySeriesStorage) AppendSample(sample model.Sample) (err error) {
return return
} }
// Append raw sample, bypassing indexing. Only used to add data to views, which
// don't need to lookup by metric.
func (s memorySeriesStorage) appendSampleWithoutIndexing(f model.Fingerprint, timestamp time.Time, value model.SampleValue) {
series, ok := s.fingerprintToSeries[f]
if !ok {
series = newStream(model.Metric{})
s.fingerprintToSeries[f] = series
}
series.add(timestamp, value)
}
func (s memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) { func (s memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) {
sets := []utility.Set{} sets := []utility.Set{}
@ -198,152 +210,99 @@ func (s memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metri
return return
} }
// XXX: Terrible wart. func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) {
func interpolateSample(x1, x2 time.Time, y1, y2 float32, e time.Time) model.SampleValue { series, ok := s.fingerprintToSeries[f]
yDelta := y2 - y1
xDelta := x2.Sub(x1)
dDt := yDelta / float32(xDelta)
offset := float32(e.Sub(x1))
return model.SampleValue(y1 + (offset * dDt))
}
func (s memorySeriesStorage) GetValueAtTime(fp model.Fingerprint, t time.Time, p StalenessPolicy) (sample *model.Sample, err error) {
series, ok := s.fingerprintToSeries[fp]
if !ok { if !ok {
return return
} }
iterator := series.values.Seek(skipListTime(t)) iterator := series.values.Seek(skipListTime(t))
if iterator == nil { if iterator == nil {
// If the iterator is nil, it means we seeked past the end of the series,
// so we seek to the last value instead. Due to the reverse ordering
// defined on skipListTime, this corresponds to the sample with the
// earliest timestamp.
iterator = series.values.SeekToLast()
if iterator == nil {
// The list is empty.
return
}
}
defer iterator.Close()
if iterator.Key() == nil || iterator.Value() == nil {
return return
} }
foundTime := time.Time(iterator.Key().(skipListTime)) foundTime := time.Time(iterator.Key().(skipListTime))
if foundTime.Equal(t) { samples = append(samples, model.SamplePair{
value := iterator.Value().(value) Timestamp: foundTime,
sample = &model.Sample{
Metric: series.metric,
Value: value.get(),
Timestamp: t,
}
return
}
if t.Sub(foundTime) > p.DeltaAllowance {
return
}
secondTime := foundTime
secondValue := iterator.Value().(value).get()
if !iterator.Previous() {
sample = &model.Sample{
Metric: series.metric,
Value: iterator.Value().(value).get(), Value: iterator.Value().(value).get(),
Timestamp: t, })
}
return
}
firstTime := time.Time(iterator.Key().(skipListTime)) if foundTime.Before(t) && iterator.Previous() {
if t.Sub(firstTime) > p.DeltaAllowance { samples = append(samples, model.SamplePair{
return Timestamp: time.Time(iterator.Key().(skipListTime)),
} Value: iterator.Value().(value).get(),
})
if firstTime.Sub(secondTime) > p.DeltaAllowance {
return
}
firstValue := iterator.Value().(value).get()
sample = &model.Sample{
Metric: series.metric,
Value: interpolateSample(firstTime, secondTime, float32(firstValue), float32(secondValue), t),
Timestamp: t,
} }
return return
} }
func (s memorySeriesStorage) GetBoundaryValues(fp model.Fingerprint, i model.Interval, p StalenessPolicy) (first *model.Sample, second *model.Sample, err error) { func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) {
first, err = s.GetValueAtTime(fp, i.OldestInclusive, p) first = s.GetValueAtTime(f, i.OldestInclusive)
if err != nil { second = s.GetValueAtTime(f, i.NewestInclusive)
return
} else if first == nil {
return
}
second, err = s.GetValueAtTime(fp, i.NewestInclusive, p)
if err != nil {
return
} else if second == nil {
first = nil
}
return return
} }
func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interval) (samples *model.SampleSet, err error) { func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) {
series, ok := s.fingerprintToSeries[fp] series, ok := s.fingerprintToSeries[f]
if !ok { if !ok {
return return
} }
samples = &model.SampleSet{ iterator := series.values.Seek(skipListTime(i.OldestInclusive))
Metric: series.metric,
}
iterator := series.values.Seek(skipListTime(i.NewestInclusive))
if iterator == nil { if iterator == nil {
// If the iterator is nil, it means we seeked past the end of the series,
// so we seek to the last value instead. Due to the reverse ordering
// defined on skipListTime, this corresponds to the sample with the
// earliest timestamp.
iterator = series.values.SeekToLast()
if iterator == nil {
// The list is empty.
return return
} }
}
defer iterator.Close()
for { for {
timestamp := time.Time(iterator.Key().(skipListTime)) timestamp := time.Time(iterator.Key().(skipListTime))
if timestamp.Before(i.OldestInclusive) { if timestamp.After(i.NewestInclusive) {
break break
} }
samples.Values = append(samples.Values, if !timestamp.Before(i.OldestInclusive) {
model.SamplePair{ samples = append(samples, model.SamplePair{
Value: iterator.Value().(value).get(), Value: iterator.Value().(value).get(),
Timestamp: timestamp, Timestamp: timestamp,
}) })
}
if !iterator.Next() { if !iterator.Previous() {
break break
} }
} }
// XXX: We should not explicitly sort here but rather rely on the datastore.
// This adds appreciable overhead.
if samples != nil {
sort.Sort(samples.Values)
}
return return
} }
func (s memorySeriesStorage) Close() { func (s memorySeriesStorage) Close() {
// This can probably be simplified: s.fingerprintToSeries = map[model.Fingerprint]stream{}
// s.labelPairToFingerprints = map[string]model.Fingerprints{}
// s.fingerPrintToSeries = map[model.Fingerprint]*stream{} s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{}
// s.labelPairToFingerprints = map[string]model.Fingerprints{}
// s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{}
for fingerprint := range s.fingerprintToSeries {
delete(s.fingerprintToSeries, fingerprint)
}
for labelPair := range s.labelPairToFingerprints {
delete(s.labelPairToFingerprints, labelPair)
}
for labelName := range s.labelNameToFingerprints {
delete(s.labelNameToFingerprints, labelName)
}
} }
func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) {

File diff suppressed because it is too large Load diff

View file

@ -15,7 +15,10 @@ package metric
import ( import (
"fmt" "fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
"math" "math"
"math/rand" "math/rand"
@ -185,6 +188,60 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste
} }
} }
func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples []model.SamplePair, err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure})
}()
k := &dto.SampleKey{
Fingerprint: fp.ToDTO(),
Timestamp: indexable.EncodeTime(i.OldestInclusive),
}
e, err := coding.NewProtocolBuffer(k).Encode()
if err != nil {
return
}
iterator := l.metricSamples.NewIterator(true)
defer iterator.Close()
predicate := keyIsOlderThan(i.NewestInclusive)
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
retrievedKey := &dto.SampleKey{}
retrievedKey, err = extractSampleKey(iterator)
if err != nil {
return
}
if predicate(retrievedKey) {
break
}
if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) {
break
}
retrievedValue, err := extractSampleValues(iterator)
if err != nil {
return nil, err
}
samples = append(samples, model.SamplePair{
Value: model.SampleValue(*retrievedValue.Value[0].Value),
Timestamp: indexable.DecodeTime(retrievedKey.Timestamp),
})
}
return
}
func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) {
stochastic := func(x int) (success bool) { stochastic := func(x int) (success bool) {
p, closer := persistenceMaker() p, closer := persistenceMaker()
@ -408,14 +465,22 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t
NewestInclusive: time.Unix(end, 0), NewestInclusive: time.Unix(end, 0),
} }
samples, err := p.GetRangeValues(model.NewFingerprintFromMetric(metric), interval) samples := []model.SamplePair{}
fp := model.NewFingerprintFromMetric(metric)
switch persistence := p.(type) {
case *LevelDBMetricPersistence:
var err error
samples, err = levelDBGetRangeValues(persistence, fp, interval)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
return return
} }
default:
samples = p.GetRangeValues(fp, interval)
}
if len(samples.Values) < 2 { if len(samples) < 2 {
t.Errorf("expected sample count less than %d, got %d", 2, len(samples.Values)) t.Errorf("expected sample count less than %d, got %d", 2, len(samples))
return return
} }
} }

View file

@ -15,7 +15,6 @@ package metric
import ( import (
"github.com/prometheus/prometheus/model" "github.com/prometheus/prometheus/model"
"github.com/ryszard/goskiplist/skiplist"
"sort" "sort"
"time" "time"
) )
@ -102,128 +101,13 @@ func (v viewRequestBuilder) ScanJobs() (j scanJobs) {
} }
type view struct { type view struct {
fingerprintToSeries map[model.Fingerprint]viewStream memorySeriesStorage
} }
func (v view) appendSample(fingerprint model.Fingerprint, timestamp time.Time, value model.SampleValue) { func (v view) appendSample(fingerprint model.Fingerprint, timestamp time.Time, value model.SampleValue) {
var ( v.appendSampleWithoutIndexing(fingerprint, timestamp, value)
series, ok = v.fingerprintToSeries[fingerprint]
)
if !ok {
series = newViewStream()
v.fingerprintToSeries[fingerprint] = series
}
series.add(timestamp, value)
}
func (v view) Close() {
v.fingerprintToSeries = make(map[model.Fingerprint]viewStream)
}
func (v view) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) {
series, ok := v.fingerprintToSeries[f]
if !ok {
return
}
iterator := series.values.Seek(skipListTime(t))
if iterator == nil {
// If the iterator is nil, it means we seeked past the end of the series,
// so we seek to the last value instead. Due to the reverse ordering
// defined on skipListTime, this corresponds to the sample with the
// earliest timestamp.
iterator = series.values.SeekToLast()
if iterator == nil {
// The list is empty.
return
}
}
defer iterator.Close()
if iterator.Key() == nil || iterator.Value() == nil {
return
}
samples = append(samples, model.SamplePair{
Timestamp: time.Time(iterator.Key().(skipListTime)),
Value: iterator.Value().(value).get(),
})
if iterator.Previous() {
samples = append(samples, model.SamplePair{
Timestamp: time.Time(iterator.Key().(skipListTime)),
Value: iterator.Value().(value).get(),
})
}
return
}
func (v view) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) {
first = v.GetValueAtTime(f, i.OldestInclusive)
second = v.GetValueAtTime(f, i.NewestInclusive)
return
}
func (v view) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) {
series, ok := v.fingerprintToSeries[f]
if !ok {
return
}
iterator := series.values.Seek(skipListTime(i.OldestInclusive))
if iterator == nil {
// If the iterator is nil, it means we seeked past the end of the series,
// so we seek to the last value instead. Due to the reverse ordering
// defined on skipListTime, this corresponds to the sample with the
// earliest timestamp.
iterator = series.values.SeekToLast()
if iterator == nil {
// The list is empty.
return
}
}
for {
timestamp := time.Time(iterator.Key().(skipListTime))
if timestamp.After(i.NewestInclusive) {
break
}
if !timestamp.Before(i.OldestInclusive) {
samples = append(samples, model.SamplePair{
Value: iterator.Value().(value).get(),
Timestamp: timestamp,
})
}
if !iterator.Previous() {
break
}
}
return
} }
func newView() view { func newView() view {
return view{ return view{NewMemorySeriesStorage()}
fingerprintToSeries: make(map[model.Fingerprint]viewStream),
}
}
type viewStream struct {
values *skiplist.SkipList
}
func (s viewStream) add(timestamp time.Time, value model.SampleValue) {
s.values.Set(skipListTime(timestamp), singletonValue(value))
}
func newViewStream() viewStream {
return viewStream{
values: skiplist.New(),
}
} }

View file

@ -54,7 +54,7 @@ func testBuilder(t test.Tester) {
in in in in
out out out out
}{ }{
// // Ensure that the fingerprint is sorted in proper order. // Ensure that the fingerprint is sorted in proper order.
{ {
in: in{ in: in{
atTimes: []atTime{ atTimes: []atTime{