From 63625bd24436ca41bdf6dd8d4be99f1ca468b8fd Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 18 Apr 2013 16:10:52 +0200 Subject: [PATCH] Make view use memory persistence, remove obsolete code. This makes the memory persistence the backing store for views and adjusts the MetricPersistence interface accordingly. It also removes unused Get* method implementations from the LevelDB persistence so they don't need to be adapted to the new interface. In the future, we should rethink these interfaces. All staleness and interpolation handling is now removed from the storage layer and will be handled only by the query layer in the future. --- rules/ast/persistence_adapter.go | 13 +- storage/metric/end_to_end_test.go | 30 +- storage/metric/interface.go | 13 +- storage/metric/leveldb.go | 318 +-------- storage/metric/memory.go | 165 ++--- storage/metric/rule_integration_test.go | 820 ++++++------------------ storage/metric/stochastic_test.go | 77 ++- storage/metric/view.go | 122 +--- storage/metric/view_test.go | 2 +- 9 files changed, 364 insertions(+), 1196 deletions(-) diff --git a/rules/ast/persistence_adapter.go b/rules/ast/persistence_adapter.go index 682f643e6..6ea72ad07 100644 --- a/rules/ast/persistence_adapter.go +++ b/rules/ast/persistence_adapter.go @@ -26,9 +26,16 @@ var defaultStalenessDelta = flag.Int("defaultStalenessDelta", 300, "Default stal // (i.e. metric->fingerprint lookups). var queryStorage metric.Storage = nil +// Describes the lenience limits to apply to values from the materialized view. +type StalenessPolicy struct { + // Describes the inclusive limit at which individual points if requested will + // be matched and subject to interpolation. + DeltaAllowance time.Duration +} + type viewAdapter struct { view metric.View - stalenessPolicy *metric.StalenessPolicy + stalenessPolicy StalenessPolicy } // interpolateSamples interpolates a value at a target time between two @@ -165,12 +172,12 @@ func SetStorage(storage metric.Storage) { } func NewViewAdapter(view metric.View) *viewAdapter { - stalenessPolicy := metric.StalenessPolicy{ + stalenessPolicy := StalenessPolicy{ DeltaAllowance: time.Duration(*defaultStalenessDelta) * time.Second, } return &viewAdapter{ view: view, - stalenessPolicy: &stalenessPolicy, + stalenessPolicy: stalenessPolicy, } } diff --git a/storage/metric/end_to_end_test.go b/storage/metric/end_to_end_test.go index 0f581b0ca..737e2ca09 100644 --- a/storage/metric/end_to_end_test.go +++ b/storage/metric/end_to_end_test.go @@ -271,18 +271,17 @@ func AppendRepeatingValuesTests(p MetricPersistence, t test.Tester) { } time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) - sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{}) - if err != nil { - t.Fatal(err) - } - if sample == nil { - t.Fatal("expected non-nil sample.") + samples := p.GetValueAtTime(fingerprints[0], time) + if len(samples) == 0 { + t.Fatal("expected at least one sample.") } expected := model.SampleValue(i) - if sample.Value != expected { - t.Fatalf("expected %d value, got %d", expected, sample.Value) + for _, sample := range samples { + if sample.Value != expected { + t.Fatalf("expected %d value, got %d", expected, sample.Value) + } } } } @@ -334,18 +333,17 @@ func AppendsRepeatingValuesTests(p MetricPersistence, t test.Tester) { } time := time.Time{}.Add(time.Duration(i) * time.Hour).Add(time.Duration(j) * time.Second) - sample, err := p.GetValueAtTime(fingerprints[0], time, StalenessPolicy{}) - if err != nil { - t.Fatal(err) - } - if sample == nil { - t.Fatal("expected non-nil sample.") + samples := p.GetValueAtTime(fingerprints[0], time) + if len(samples) == 0 { + t.Fatal("expected at least one sample.") } expected := model.SampleValue(i) - if sample.Value != expected { - t.Fatalf("expected %d value, got %d", expected, sample.Value) + for _, sample := range samples { + if sample.Value != expected { + t.Fatalf("expected %d value, got %d", expected, sample.Value) + } } } } diff --git a/storage/metric/interface.go b/storage/metric/interface.go index 27358c657..757cd19db 100644 --- a/storage/metric/interface.go +++ b/storage/metric/interface.go @@ -47,9 +47,9 @@ type MetricPersistence interface { // Get the metric associated with the provided fingerprint. GetMetricForFingerprint(model.Fingerprint) (*model.Metric, error) - GetValueAtTime(model.Fingerprint, time.Time, StalenessPolicy) (*model.Sample, error) - GetBoundaryValues(model.Fingerprint, model.Interval, StalenessPolicy) (*model.Sample, *model.Sample, error) - GetRangeValues(model.Fingerprint, model.Interval) (*model.SampleSet, error) + GetValueAtTime(model.Fingerprint, time.Time) []model.SamplePair + GetBoundaryValues(model.Fingerprint, model.Interval) (first []model.SamplePair, second []model.SamplePair) + GetRangeValues(model.Fingerprint, model.Interval) []model.SamplePair ForEachSample(IteratorsForFingerprintBuilder) (err error) @@ -61,13 +61,6 @@ type MetricPersistence interface { // MakeView(builder ViewRequestBuilder, deadline time.Duration) (View, error) } -// Describes the lenience limits for querying the materialized View. -type StalenessPolicy struct { - // Describes the inclusive limit at which individual points if requested will - // be matched and subject to interpolation. - DeltaAllowance time.Duration -} - // View provides view of the values in the datastore subject to the request of a // preloading operation. type View interface { diff --git a/storage/metric/leveldb.go b/storage/metric/leveldb.go index fef8e4e89..d3acbf46a 100644 --- a/storage/metric/leveldb.go +++ b/storage/metric/leveldb.go @@ -862,322 +862,16 @@ func (l *LevelDBMetricPersistence) GetMetricForFingerprint(f model.Fingerprint) return } -func (l *LevelDBMetricPersistence) GetBoundaryValues(fp model.Fingerprint, i model.Interval, s StalenessPolicy) (open *model.Sample, end *model.Sample, err error) { - begin := time.Now() - - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: getBoundaryValues, result: success}, map[string]string{operation: getBoundaryValues, result: failure}) - }() - - // XXX: Maybe we will want to emit incomplete sets? - open, err = l.GetValueAtTime(fp, i.OldestInclusive, s) - if err != nil { - return - } else if open == nil { - return - } - - end, err = l.GetValueAtTime(fp, i.NewestInclusive, s) - if err != nil { - return - } else if end == nil { - open = nil - } - - return +func (l LevelDBMetricPersistence) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { + panic("Not implemented") } -func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 { - yDelta := y2 - y1 - xDelta := x2.Sub(x1) - - dDt := yDelta / float32(xDelta) - offset := float32(e.Sub(x1)) - - return y1 + (offset * dDt) +func (l LevelDBMetricPersistence) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { + panic("Not implemented") } -func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) { - begin := time.Now() - - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: getValueAtTime, result: success}, map[string]string{operation: getValueAtTime, result: failure}) - }() - - // TODO: memoize/cache this or change the return type to metric.SamplePair. - m, err := l.GetMetricForFingerprint(fp) - if err != nil { - return - } - - // Candidate for Refactoring - k := &dto.SampleKey{ - Fingerprint: fp.ToDTO(), - Timestamp: indexable.EncodeTime(t), - } - - e, err := coding.NewProtocolBuffer(k).Encode() - if err != nil { - return - } - - iterator := l.metricSamples.NewIterator(true) - defer iterator.Close() - - if !iterator.Seek(e) { - /* - * Two cases for this: - * 1.) Corruption in LevelDB. - * 2.) Key seek after AND outside known range. - * - * Once a LevelDB iterator goes invalid, it cannot be recovered; thusly, - * we need to create a new in order to check if the last value in the - * database is sufficient for our purposes. This is, in all reality, a - * corner case but one that could bring down the system. - */ - iterator = l.metricSamples.NewIterator(true) - defer iterator.Close() - - if !iterator.SeekToLast() { - /* - * For whatever reason, the LevelDB cannot be recovered. - */ - return - } - } - - var ( - firstKey *dto.SampleKey - firstValue *dto.SampleValueSeries - ) - - firstKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - peekAhead := false - - if !fingerprintsEqual(firstKey.Fingerprint, k.Fingerprint) { - /* - * This allows us to grab values for metrics if our request time is after - * the last recorded time subject to the staleness policy due to the nuances - * of LevelDB storage: - * - * # Assumptions: - * - K0 < K1 in terms of sorting. - * - T0 < T1 in terms of sorting. - * - * # Data - * - * K0-T0 - * K0-T1 - * K0-T2 - * K1-T0 - * K1-T1 - * - * # Scenario - * K0-T3, which does not exist, is requested. LevelDB will thusly seek to - * K1-T1, when K0-T2 exists as a perfectly good candidate to check subject - * to the provided staleness policy and such. - */ - peekAhead = true - } - - firstTime := indexable.DecodeTime(firstKey.Timestamp) - if t.Before(firstTime) || peekAhead { - if !iterator.Previous() { - /* - * Two cases for this: - * 1.) Corruption in LevelDB. - * 2.) Key seek before AND outside known range. - * - * This is an explicit validation to ensure that if no previous values for - * the series are found, the query aborts. - */ - return - } - - var ( - alternativeKey *dto.SampleKey - alternativeValue *dto.SampleValueSeries - ) - - alternativeKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - if !fingerprintsEqual(alternativeKey.Fingerprint, k.Fingerprint) { - return - } - - /* - * At this point, we found a previous value in the same series in the - * database. LevelDB originally seeked to the subsequent element given - * the key, but we need to consider this adjacency instead. - */ - alternativeTime := indexable.DecodeTime(alternativeKey.Timestamp) - - firstKey = alternativeKey - firstValue = alternativeValue - firstTime = alternativeTime - } - - firstDelta := firstTime.Sub(t) - if firstDelta < 0 { - firstDelta *= -1 - } - if firstDelta > s.DeltaAllowance { - return - } - - firstValue, err = extractSampleValues(iterator) - if err != nil { - return - } - - sample = model.SampleFromDTO(m, &t, firstValue) - - if firstDelta == time.Duration(0) { - return - } - - if !iterator.Next() { - /* - * Two cases for this: - * 1.) Corruption in LevelDB. - * 2.) Key seek after AND outside known range. - * - * This means that there are no more values left in the storage; and if this - * point is reached, we know that the one that has been found is within the - * allowed staleness limits. - */ - return - } - - var secondKey *dto.SampleKey - - secondKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - if !fingerprintsEqual(secondKey.Fingerprint, k.Fingerprint) { - return - } else { - /* - * At this point, current entry in the database has the same key as the - * previous. For this reason, the validation logic will expect that the - * distance between the two points shall not exceed the staleness policy - * allowed limit to reduce interpolation errors. - * - * For this reason, the sample is reset in case of other subsequent - * validation behaviors. - */ - sample = nil - } - - secondTime := indexable.DecodeTime(secondKey.Timestamp) - - totalDelta := secondTime.Sub(firstTime) - if totalDelta > s.DeltaAllowance { - return - } - - var secondValue *dto.SampleValueSeries - - secondValue, err = extractSampleValues(iterator) - if err != nil { - return - } - - fValue := *firstValue.Value[0].Value - sValue := *secondValue.Value[0].Value - - interpolated := interpolate(firstTime, secondTime, fValue, sValue, t) - - sampleValue := &dto.SampleValueSeries{} - sampleValue.Value = append(sampleValue.Value, &dto.SampleValueSeries_Value{Value: &interpolated}) - - sample = model.SampleFromDTO(m, &t, sampleValue) - - return -} - -func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.Interval) (v *model.SampleSet, err error) { - begin := time.Now() - - defer func() { - duration := time.Since(begin) - - recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) - }() - - k := &dto.SampleKey{ - Fingerprint: fp.ToDTO(), - Timestamp: indexable.EncodeTime(i.OldestInclusive), - } - - e, err := coding.NewProtocolBuffer(k).Encode() - if err != nil { - return - } - - iterator := l.metricSamples.NewIterator(true) - defer iterator.Close() - - predicate := keyIsOlderThan(i.NewestInclusive) - - for valid := iterator.Seek(e); valid; valid = iterator.Next() { - retrievedKey := &dto.SampleKey{} - - retrievedKey, err = extractSampleKey(iterator) - if err != nil { - return - } - - if predicate(retrievedKey) { - break - } - - if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { - break - } - - retrievedValue, err := extractSampleValues(iterator) - if err != nil { - return nil, err - } - - if v == nil { - // TODO: memoize/cache this or change the return type to metric.SamplePair. - m, err := l.GetMetricForFingerprint(fp) - if err != nil { - return v, err - } - v = &model.SampleSet{ - Metric: *m, - } - } - - v.Values = append(v.Values, model.SamplePair{ - Value: model.SampleValue(*retrievedValue.Value[0].Value), - Timestamp: indexable.DecodeTime(retrievedKey.Timestamp), - }) - } - - // XXX: We should not explicitly sort here but rather rely on the datastore. - // This adds appreciable overhead. - if v != nil { - sort.Sort(v.Values) - } - - return +func (l *LevelDBMetricPersistence) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { + panic("Not implemented") } type MetricKeyDecoder struct{} diff --git a/storage/metric/memory.go b/storage/metric/memory.go index b62263de3..08c80c84b 100644 --- a/storage/metric/memory.go +++ b/storage/metric/memory.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/utility" "github.com/ryszard/goskiplist/skiplist" - "sort" "time" ) @@ -148,6 +147,19 @@ func (s memorySeriesStorage) AppendSample(sample model.Sample) (err error) { return } +// Append raw sample, bypassing indexing. Only used to add data to views, which +// don't need to lookup by metric. +func (s memorySeriesStorage) appendSampleWithoutIndexing(f model.Fingerprint, timestamp time.Time, value model.SampleValue) { + series, ok := s.fingerprintToSeries[f] + + if !ok { + series = newStream(model.Metric{}) + s.fingerprintToSeries[f] = series + } + + series.add(timestamp, value) +} + func (s memorySeriesStorage) GetFingerprintsForLabelSet(l model.LabelSet) (fingerprints model.Fingerprints, err error) { sets := []utility.Set{} @@ -198,152 +210,99 @@ func (s memorySeriesStorage) GetMetricForFingerprint(f model.Fingerprint) (metri return } -// XXX: Terrible wart. -func interpolateSample(x1, x2 time.Time, y1, y2 float32, e time.Time) model.SampleValue { - yDelta := y2 - y1 - xDelta := x2.Sub(x1) - - dDt := yDelta / float32(xDelta) - offset := float32(e.Sub(x1)) - - return model.SampleValue(y1 + (offset * dDt)) -} - -func (s memorySeriesStorage) GetValueAtTime(fp model.Fingerprint, t time.Time, p StalenessPolicy) (sample *model.Sample, err error) { - series, ok := s.fingerprintToSeries[fp] +func (s memorySeriesStorage) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { + series, ok := s.fingerprintToSeries[f] if !ok { return } iterator := series.values.Seek(skipListTime(t)) if iterator == nil { + // If the iterator is nil, it means we seeked past the end of the series, + // so we seek to the last value instead. Due to the reverse ordering + // defined on skipListTime, this corresponds to the sample with the + // earliest timestamp. + iterator = series.values.SeekToLast() + if iterator == nil { + // The list is empty. + return + } + } + + defer iterator.Close() + + if iterator.Key() == nil || iterator.Value() == nil { return } foundTime := time.Time(iterator.Key().(skipListTime)) - if foundTime.Equal(t) { - value := iterator.Value().(value) - sample = &model.Sample{ - Metric: series.metric, - Value: value.get(), - Timestamp: t, - } + samples = append(samples, model.SamplePair{ + Timestamp: foundTime, + Value: iterator.Value().(value).get(), + }) - return - } - - if t.Sub(foundTime) > p.DeltaAllowance { - return - } - - secondTime := foundTime - secondValue := iterator.Value().(value).get() - - if !iterator.Previous() { - sample = &model.Sample{ - Metric: series.metric, + if foundTime.Before(t) && iterator.Previous() { + samples = append(samples, model.SamplePair{ + Timestamp: time.Time(iterator.Key().(skipListTime)), Value: iterator.Value().(value).get(), - Timestamp: t, - } - return - } - - firstTime := time.Time(iterator.Key().(skipListTime)) - if t.Sub(firstTime) > p.DeltaAllowance { - return - } - - if firstTime.Sub(secondTime) > p.DeltaAllowance { - return - } - - firstValue := iterator.Value().(value).get() - - sample = &model.Sample{ - Metric: series.metric, - Value: interpolateSample(firstTime, secondTime, float32(firstValue), float32(secondValue), t), - Timestamp: t, + }) } return } -func (s memorySeriesStorage) GetBoundaryValues(fp model.Fingerprint, i model.Interval, p StalenessPolicy) (first *model.Sample, second *model.Sample, err error) { - first, err = s.GetValueAtTime(fp, i.OldestInclusive, p) - if err != nil { - return - } else if first == nil { - return - } - - second, err = s.GetValueAtTime(fp, i.NewestInclusive, p) - if err != nil { - return - } else if second == nil { - first = nil - } - +func (s memorySeriesStorage) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { + first = s.GetValueAtTime(f, i.OldestInclusive) + second = s.GetValueAtTime(f, i.NewestInclusive) return } -func (s memorySeriesStorage) GetRangeValues(fp model.Fingerprint, i model.Interval) (samples *model.SampleSet, err error) { - series, ok := s.fingerprintToSeries[fp] +func (s memorySeriesStorage) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { + series, ok := s.fingerprintToSeries[f] if !ok { return } - samples = &model.SampleSet{ - Metric: series.metric, + iterator := series.values.Seek(skipListTime(i.OldestInclusive)) + if iterator == nil { + // If the iterator is nil, it means we seeked past the end of the series, + // so we seek to the last value instead. Due to the reverse ordering + // defined on skipListTime, this corresponds to the sample with the + // earliest timestamp. + iterator = series.values.SeekToLast() + if iterator == nil { + // The list is empty. + return + } } - iterator := series.values.Seek(skipListTime(i.NewestInclusive)) - if iterator == nil { - return - } + defer iterator.Close() for { timestamp := time.Time(iterator.Key().(skipListTime)) - if timestamp.Before(i.OldestInclusive) { + if timestamp.After(i.NewestInclusive) { break } - samples.Values = append(samples.Values, - model.SamplePair{ + if !timestamp.Before(i.OldestInclusive) { + samples = append(samples, model.SamplePair{ Value: iterator.Value().(value).get(), Timestamp: timestamp, }) + } - if !iterator.Next() { + if !iterator.Previous() { break } } - // XXX: We should not explicitly sort here but rather rely on the datastore. - // This adds appreciable overhead. - if samples != nil { - sort.Sort(samples.Values) - } - return } func (s memorySeriesStorage) Close() { - // This can probably be simplified: - // - // s.fingerPrintToSeries = map[model.Fingerprint]*stream{} - // s.labelPairToFingerprints = map[string]model.Fingerprints{} - // s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{} - for fingerprint := range s.fingerprintToSeries { - delete(s.fingerprintToSeries, fingerprint) - } - - for labelPair := range s.labelPairToFingerprints { - delete(s.labelPairToFingerprints, labelPair) - } - - for labelName := range s.labelNameToFingerprints { - delete(s.labelNameToFingerprints, labelName) - } + s.fingerprintToSeries = map[model.Fingerprint]stream{} + s.labelPairToFingerprints = map[string]model.Fingerprints{} + s.labelNameToFingerprints = map[model.LabelName]model.Fingerprints{} } func (s memorySeriesStorage) GetAllValuesForLabel(labelName model.LabelName) (values model.LabelValues, err error) { diff --git a/storage/metric/rule_integration_test.go b/storage/metric/rule_integration_test.go index 65dcec2ea..c7c6e2fec 100644 --- a/storage/metric/rule_integration_test.go +++ b/storage/metric/rule_integration_test.go @@ -30,21 +30,18 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer } type input struct { - year int - month time.Month - day int - hour int - staleness time.Duration + year int + month time.Month + day int + hour int } - type output struct { - value model.SampleValue - } + type output []model.SampleValue type behavior struct { name string input input - output *output + output output } var contexts = []struct { @@ -59,11 +56,10 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer { name: "random target", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, }, }, @@ -81,92 +77,39 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer }, behaviors: []behavior{ { - name: "exact without staleness policy", + name: "exact", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "exact with staleness policy", + name: "before", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1984, + month: 3, + day: 29, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "before without staleness policy", + name: "after", input: input{ - year: 1984, - month: 3, - day: 29, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 31, + hour: 0, }, - }, - { - name: "before within staleness policy", - input: input{ - year: 1984, - month: 3, - day: 29, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "before outside staleness policy", - input: input{ - year: 1984, - month: 3, - day: 29, - hour: 0, - staleness: time.Duration(1) * time.Hour, - }, - }, - { - name: "after without staleness policy", - input: input{ - year: 1984, - month: 3, - day: 31, - hour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "after within staleness policy", - input: input{ - year: 1984, - month: 3, - day: 31, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 0, - }, - }, - { - name: "after outside staleness policy", - input: input{ - year: 1984, - month: 4, - day: 7, - hour: 0, - staleness: time.Duration(7*24) * time.Hour, + output: output{ + 0, }, }, }, @@ -191,131 +134,64 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer }, behaviors: []behavior{ { - name: "exact first without staleness policy", + name: "exact first", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "exact first with staleness policy", + name: "exact second", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1985, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 1, }, }, { - name: "exact second without staleness policy", + name: "before first", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1983, + month: 9, + day: 29, + hour: 12, }, - output: &output{ - value: 1, + output: output{ + 0, }, }, { - name: "exact second with staleness policy", + name: "after second", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1985, + month: 9, + day: 28, + hour: 12, }, - output: &output{ - value: 1, + output: output{ + 1, }, }, { - name: "before first without staleness policy", + name: "middle", input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(0), + year: 1984, + month: 9, + day: 28, + hour: 12, }, - }, - { - name: "before first with staleness policy", - input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "after second with staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 1, - }, - }, - { - name: "after second without staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "middle without staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "middle with insufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(364*24) * time.Hour, - }, - }, - { - name: "middle with sufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 0.5, + output: output{ + 0, + 1, }, }, }, @@ -347,200 +223,89 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer }, behaviors: []behavior{ { - name: "exact first without staleness policy", + name: "exact first", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1984, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 0, }, }, { - name: "exact first with staleness policy", + name: "exact second", input: input{ - year: 1984, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1985, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 0, + output: output{ + 1, }, }, { - name: "exact second without staleness policy", + name: "exact third", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1986, + month: 3, + day: 30, + hour: 0, }, - output: &output{ - value: 1, + output: output{ + 2, }, }, { - name: "exact second with staleness policy", + name: "before first", input: input{ - year: 1985, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1983, + month: 9, + day: 29, + hour: 12, }, - output: &output{ - value: 1, + output: output{ + 0, }, }, { - name: "exact third without staleness policy", + name: "after third", input: input{ - year: 1986, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(0), + year: 1986, + month: 9, + day: 28, + hour: 12, }, - output: &output{ - value: 2, + output: output{ + 2, }, }, { - name: "exact third with staleness policy", + name: "first middle", input: input{ - year: 1986, - month: 3, - day: 30, - hour: 0, - staleness: time.Duration(365*24) * time.Hour, + year: 1984, + month: 9, + day: 28, + hour: 12, }, - output: &output{ - value: 2, + output: output{ + 0, + 1, }, }, { - name: "before first without staleness policy", + name: "second middle", input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(0), + year: 1985, + month: 9, + day: 28, + hour: 12, }, - }, - { - name: "before first with staleness policy", - input: input{ - year: 1983, - month: 9, - day: 29, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "after third within staleness policy", - input: input{ - year: 1986, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 2, - }, - }, - { - name: "after third outside staleness policy", - input: input{ - year: 1986, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(1*24) * time.Hour, - }, - }, - { - name: "after third without staleness policy", - input: input{ - year: 1986, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "first middle without staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "first middle with insufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(364*24) * time.Hour, - }, - }, - { - name: "first middle with sufficient staleness policy", - input: input{ - year: 1984, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 0.5, - }, - }, - { - name: "second middle without staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(0), - }, - }, - { - name: "second middle with insufficient staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(364*24) * time.Hour, - }, - }, - { - name: "second middle with sufficient staleness policy", - input: input{ - year: 1985, - month: 9, - day: 28, - hour: 12, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - value: 1.5, + output: output{ + 1, + 2, }, }, }, @@ -570,27 +335,16 @@ func GetValueAtTimeTests(persistenceMaker func() (MetricPersistence, test.Closer for j, behavior := range context.behaviors { input := behavior.input time := time.Date(input.year, input.month, input.day, input.hour, 0, 0, 0, time.UTC) - sp := StalenessPolicy{ - DeltaAllowance: input.staleness, + + actual := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time) + + if len(behavior.output) != len(actual) { + t.Fatalf("%d.%d(%s). Expected %d samples but got: %v\n", i, j, behavior.name, len(behavior.output), actual) } + for k, samplePair := range actual { + if samplePair.Value != behavior.output[k] { + t.Fatalf("%d.%d.%d(%s). Expected %s but got %s\n", i, j, k, behavior.name, behavior.output[k], samplePair) - actual, err := p.GetValueAtTime(model.NewFingerprintFromMetric(m), time, sp) - if err != nil { - t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) - } - - if behavior.output == nil { - if actual != nil { - t.Fatalf("%d.%d(%s). Expected nil but got: %q\n", i, j, behavior.name, actual) - } - } else { - if actual == nil { - t.Fatalf("%d.%d(%s). Expected %s but got nil\n", i, j, behavior.name, behavior.output) - } else { - if actual.Value != behavior.output.value { - t.Fatalf("%d.%d(%s). Expected %s but got %s\n", i, j, behavior.name, behavior.output, actual) - - } } } } @@ -616,18 +370,17 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth time.Month endDay int endHour int - staleness time.Duration } type output struct { - open model.SampleValue - end model.SampleValue + open []model.SampleValue + end []model.SampleValue } type behavior struct { name string input input - output *output + output output } var contexts = []struct { @@ -640,7 +393,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo values: []value{}, behaviors: []behavior{ { - name: "non-existent interval without staleness policy", + name: "non-existent interval", input: input{ openYear: 1984, openMonth: 3, @@ -650,21 +403,6 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "non-existent interval with staleness policy", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1985, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, }, }, }, @@ -682,7 +420,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo }, behaviors: []behavior{ { - name: "on start but missing end without staleness policy", + name: "on start but missing end", input: input{ openYear: 1984, openMonth: 3, @@ -692,11 +430,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "non-existent interval after within staleness policy", + name: "non-existent interval after", input: input{ openYear: 1984, openMonth: 3, @@ -706,25 +447,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(4380) * time.Hour, + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "non-existent interval after without staleness policy", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 31, - openHour: 0, - endYear: 1985, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "non-existent interval before with staleness policy", + name: "non-existent interval before", input: input{ openYear: 1983, openMonth: 3, @@ -734,25 +464,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 29, endHour: 0, - staleness: time.Duration(365*24) * time.Hour, + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "non-existent interval before without staleness policy", - input: input{ - openYear: 1983, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1984, - endMonth: 3, - endDay: 29, - endHour: 0, - staleness: time.Duration(0), - }, - }, - { - name: "on end but not start without staleness policy", + name: "on end but not start", input: input{ openYear: 1983, openMonth: 3, @@ -762,25 +481,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "on end but not start without staleness policy", - input: input{ - openYear: 1983, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1984, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "before point without staleness policy", + name: "before point", input: input{ openYear: 1982, openMonth: 3, @@ -790,25 +498,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "before point with staleness policy", - input: input{ - openYear: 1982, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1983, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "after point without staleness policy", + name: "after point", input: input{ openYear: 1985, openMonth: 3, @@ -818,25 +515,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, { - name: "after point with staleness policy", - input: input{ - openYear: 1985, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1986, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - }, - { - name: "spanning point without staleness policy", + name: "spanning point", input: input{ openYear: 1983, openMonth: 9, @@ -846,21 +532,10 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 9, endDay: 28, endHour: 12, - staleness: time.Duration(0), }, - }, - { - name: "spanning point with staleness policy", - input: input{ - openYear: 1983, - openMonth: 9, - openDay: 29, - openHour: 12, - endYear: 1984, - endMonth: 9, - endDay: 28, - endHour: 12, - staleness: time.Duration(365*24) * time.Hour, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0}, }, }, }, @@ -885,7 +560,7 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo }, behaviors: []behavior{ { - name: "on points without staleness policy", + name: "on points", input: input{ openYear: 1984, openMonth: 3, @@ -895,33 +570,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 3, endDay: 30, endHour: 0, - staleness: time.Duration(0), }, - output: &output{ - open: 0, - end: 1, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{1}, }, }, { - name: "on points with staleness policy", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1985, - endMonth: 3, - endDay: 30, - endHour: 0, - staleness: time.Duration(365*24) * time.Hour, - }, - output: &output{ - open: 0, - end: 1, - }, - }, - { - name: "on first before second outside of staleness", + name: "on first before second", input: input{ openYear: 1984, openMonth: 3, @@ -931,25 +587,14 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 6, endDay: 29, endHour: 6, - staleness: time.Duration(2190) * time.Hour, + }, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{0, 1}, }, }, { - name: "on first before second within staleness", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1984, - endMonth: 6, - endDay: 29, - endHour: 6, - staleness: time.Duration(356*24) * time.Hour, - }, - }, - { - name: "on first after second outside of staleness", + name: "on first after second", input: input{ openYear: 1984, openMonth: 3, @@ -959,25 +604,10 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo endMonth: 6, endDay: 29, endHour: 6, - staleness: time.Duration(1) * time.Hour, }, - }, - { - name: "on first after second within staleness", - input: input{ - openYear: 1984, - openMonth: 3, - openDay: 30, - openHour: 0, - endYear: 1985, - endMonth: 6, - endDay: 29, - endHour: 6, - staleness: time.Duration(356*24) * time.Hour, - }, - output: &output{ - open: 0, - end: 1, + output: output{ + open: []model.SampleValue{0}, + end: []model.SampleValue{1}, }, }, }, @@ -1012,35 +642,23 @@ func GetBoundaryValuesTests(persistenceMaker func() (MetricPersistence, test.Clo OldestInclusive: open, NewestInclusive: end, } - po := StalenessPolicy{ - DeltaAllowance: input.staleness, + openValues, endValues := p.GetBoundaryValues(model.NewFingerprintFromMetric(m), interval) + if len(behavior.output.open) != len(openValues) { + t.Fatalf("%d.%d(%s). Expected %d open values but got: %q\n", i, j, behavior.name, len(behavior.output.open), openValues) + } + if len(behavior.output.end) != len(endValues) { + t.Fatalf("%d.%d(%s). Expected %d open values but got: %q\n", i, j, behavior.name, len(behavior.output.open), openValues) } - openValue, endValue, err := p.GetBoundaryValues(model.NewFingerprintFromMetric(m), interval, po) - if err != nil { - t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) + for k, samplePair := range openValues { + if samplePair.Value != behavior.output.open[k] { + t.Fatalf("%d.%d.%d(%s). Expected open to be %v but got %v\n", i, j, k, behavior.name, behavior.output.open[k], samplePair.Value) + } } - if behavior.output == nil { - if openValue != nil { - t.Fatalf("%d.%d(%s). Expected open to be nil but got: %q\n", i, j, behavior.name, openValue) - } - if endValue != nil { - t.Fatalf("%d.%d(%s). Expected end to be nil but got: %q\n", i, j, behavior.name, endValue) - } - } else { - if openValue == nil { - t.Fatalf("%d.%d(%s). Expected open to be %s but got nil\n", i, j, behavior.name, behavior.output) - } - if endValue == nil { - t.Fatalf("%d.%d(%s). Expected end to be %s but got nil\n", i, j, behavior.name, behavior.output) - } - if openValue.Value != behavior.output.open { - t.Fatalf("%d.%d(%s). Expected open to be %s but got %s\n", i, j, behavior.name, behavior.output.open, openValue.Value) - } - - if endValue.Value != behavior.output.end { - t.Fatalf("%d.%d(%s). Expected end to be %s but got %s\n", i, j, behavior.name, behavior.output.end, endValue.Value) + for k, samplePair := range endValues { + if samplePair.Value != behavior.output.end[k] { + t.Fatalf("%d.%d.%d(%s). Expected end to be %v but got %v\n", i, j, k, behavior.name, behavior.output.end[k], samplePair.Value) } } } @@ -1362,10 +980,7 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer NewestInclusive: end, } - values, err := p.GetRangeValues(model.NewFingerprintFromMetric(m), in) - if err != nil { - t.Fatalf("%d.%d(%s). Could not query for value: %q\n", i, j, behavior.name, err) - } + values := p.GetRangeValues(model.NewFingerprintFromMetric(m), in) if values == nil && len(behavior.output) != 0 { t.Fatalf("%d.%d(%s). Expected %s but got: %s\n", i, j, behavior.name, behavior.output, values) @@ -1376,11 +991,11 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer t.Fatalf("%d.%d(%s). Expected nil values but got: %s\n", i, j, behavior.name, values) } } else { - if len(behavior.output) != len(values.Values) { - t.Fatalf("%d.%d(%s). Expected length %d but got: %d\n", i, j, behavior.name, len(behavior.output), len(values.Values)) + if len(behavior.output) != len(values) { + t.Fatalf("%d.%d(%s). Expected length %d but got: %d\n", i, j, behavior.name, len(behavior.output), len(values)) } - for k, actual := range values.Values { + for k, actual := range values { expected := behavior.output[k] if actual.Value != model.SampleValue(expected.value) { @@ -1412,53 +1027,6 @@ func GetRangeValuesTests(persistenceMaker func() (MetricPersistence, test.Closer // Test Definitions Follow -func testLevelDBGetValueAtTime(t test.Tester) { - persistenceMaker := buildLevelDBTestPersistencesMaker("get_value_at_time", t) - GetValueAtTimeTests(persistenceMaker, t) -} - -func TestLevelDBGetValueAtTime(t *testing.T) { - testLevelDBGetValueAtTime(t) -} - -func BenchmarkLevelDBGetValueAtTime(b *testing.B) { - for i := 0; i < b.N; i++ { - testLevelDBGetValueAtTime(b) - } -} - -func testLevelDBGetBoundaryValues(t test.Tester) { - persistenceMaker := buildLevelDBTestPersistencesMaker("get_boundary_values", t) - - GetBoundaryValuesTests(persistenceMaker, t) -} - -func TestLevelDBGetBoundaryValues(t *testing.T) { - testLevelDBGetBoundaryValues(t) -} - -func BenchmarkLevelDBGetBoundaryValues(b *testing.B) { - for i := 0; i < b.N; i++ { - testLevelDBGetBoundaryValues(b) - } -} - -func testLevelDBGetRangeValues(t test.Tester) { - persistenceMaker := buildLevelDBTestPersistencesMaker("get_range_values", t) - - GetRangeValuesTests(persistenceMaker, t) -} - -func TestLevelDBGetRangeValues(t *testing.T) { - testLevelDBGetRangeValues(t) -} - -func BenchmarkLevelDBGetRangeValues(b *testing.B) { - for i := 0; i < b.N; i++ { - testLevelDBGetRangeValues(b) - } -} - func testMemoryGetValueAtTime(t test.Tester) { persistenceMaker := func() (MetricPersistence, test.Closer) { return NewMemorySeriesStorage(), test.NilCloser diff --git a/storage/metric/stochastic_test.go b/storage/metric/stochastic_test.go index 6045da867..1c22a0ee3 100644 --- a/storage/metric/stochastic_test.go +++ b/storage/metric/stochastic_test.go @@ -15,7 +15,10 @@ package metric import ( "fmt" + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/coding/indexable" "github.com/prometheus/prometheus/model" + dto "github.com/prometheus/prometheus/model/generated" "github.com/prometheus/prometheus/utility/test" "math" "math/rand" @@ -185,6 +188,60 @@ func AppendSampleAsPureSingleEntityAppendTests(p MetricPersistence, t test.Teste } } +func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp model.Fingerprint, i model.Interval) (samples []model.SamplePair, err error) { + begin := time.Now() + + defer func() { + duration := time.Since(begin) + + recordOutcome(duration, err, map[string]string{operation: getRangeValues, result: success}, map[string]string{operation: getRangeValues, result: failure}) + }() + + k := &dto.SampleKey{ + Fingerprint: fp.ToDTO(), + Timestamp: indexable.EncodeTime(i.OldestInclusive), + } + + e, err := coding.NewProtocolBuffer(k).Encode() + if err != nil { + return + } + + iterator := l.metricSamples.NewIterator(true) + defer iterator.Close() + + predicate := keyIsOlderThan(i.NewestInclusive) + + for valid := iterator.Seek(e); valid; valid = iterator.Next() { + retrievedKey := &dto.SampleKey{} + + retrievedKey, err = extractSampleKey(iterator) + if err != nil { + return + } + + if predicate(retrievedKey) { + break + } + + if !fingerprintsEqual(retrievedKey.Fingerprint, k.Fingerprint) { + break + } + + retrievedValue, err := extractSampleValues(iterator) + if err != nil { + return nil, err + } + + samples = append(samples, model.SamplePair{ + Value: model.SampleValue(*retrievedValue.Value[0].Value), + Timestamp: indexable.DecodeTime(retrievedKey.Timestamp), + }) + } + + return +} + func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t test.Tester) { stochastic := func(x int) (success bool) { p, closer := persistenceMaker() @@ -408,14 +465,22 @@ func StochasticTests(persistenceMaker func() (MetricPersistence, test.Closer), t NewestInclusive: time.Unix(end, 0), } - samples, err := p.GetRangeValues(model.NewFingerprintFromMetric(metric), interval) - if err != nil { - t.Error(err) - return + samples := []model.SamplePair{} + fp := model.NewFingerprintFromMetric(metric) + switch persistence := p.(type) { + case *LevelDBMetricPersistence: + var err error + samples, err = levelDBGetRangeValues(persistence, fp, interval) + if err != nil { + t.Fatal(err) + return + } + default: + samples = p.GetRangeValues(fp, interval) } - if len(samples.Values) < 2 { - t.Errorf("expected sample count less than %d, got %d", 2, len(samples.Values)) + if len(samples) < 2 { + t.Errorf("expected sample count less than %d, got %d", 2, len(samples)) return } } diff --git a/storage/metric/view.go b/storage/metric/view.go index 3027328eb..79fd2cd88 100644 --- a/storage/metric/view.go +++ b/storage/metric/view.go @@ -15,7 +15,6 @@ package metric import ( "github.com/prometheus/prometheus/model" - "github.com/ryszard/goskiplist/skiplist" "sort" "time" ) @@ -102,128 +101,13 @@ func (v viewRequestBuilder) ScanJobs() (j scanJobs) { } type view struct { - fingerprintToSeries map[model.Fingerprint]viewStream + memorySeriesStorage } func (v view) appendSample(fingerprint model.Fingerprint, timestamp time.Time, value model.SampleValue) { - var ( - series, ok = v.fingerprintToSeries[fingerprint] - ) - - if !ok { - series = newViewStream() - v.fingerprintToSeries[fingerprint] = series - } - - series.add(timestamp, value) -} - -func (v view) Close() { - v.fingerprintToSeries = make(map[model.Fingerprint]viewStream) -} - -func (v view) GetValueAtTime(f model.Fingerprint, t time.Time) (samples []model.SamplePair) { - series, ok := v.fingerprintToSeries[f] - if !ok { - return - } - - iterator := series.values.Seek(skipListTime(t)) - if iterator == nil { - // If the iterator is nil, it means we seeked past the end of the series, - // so we seek to the last value instead. Due to the reverse ordering - // defined on skipListTime, this corresponds to the sample with the - // earliest timestamp. - iterator = series.values.SeekToLast() - if iterator == nil { - // The list is empty. - return - } - } - - defer iterator.Close() - - if iterator.Key() == nil || iterator.Value() == nil { - return - } - - samples = append(samples, model.SamplePair{ - Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(value).get(), - }) - - if iterator.Previous() { - samples = append(samples, model.SamplePair{ - Timestamp: time.Time(iterator.Key().(skipListTime)), - Value: iterator.Value().(value).get(), - }) - } - - return -} - -func (v view) GetBoundaryValues(f model.Fingerprint, i model.Interval) (first []model.SamplePair, second []model.SamplePair) { - first = v.GetValueAtTime(f, i.OldestInclusive) - second = v.GetValueAtTime(f, i.NewestInclusive) - return -} - -func (v view) GetRangeValues(f model.Fingerprint, i model.Interval) (samples []model.SamplePair) { - series, ok := v.fingerprintToSeries[f] - if !ok { - return - } - - iterator := series.values.Seek(skipListTime(i.OldestInclusive)) - if iterator == nil { - // If the iterator is nil, it means we seeked past the end of the series, - // so we seek to the last value instead. Due to the reverse ordering - // defined on skipListTime, this corresponds to the sample with the - // earliest timestamp. - iterator = series.values.SeekToLast() - if iterator == nil { - // The list is empty. - return - } - } - - for { - timestamp := time.Time(iterator.Key().(skipListTime)) - if timestamp.After(i.NewestInclusive) { - break - } - - if !timestamp.Before(i.OldestInclusive) { - samples = append(samples, model.SamplePair{ - Value: iterator.Value().(value).get(), - Timestamp: timestamp, - }) - } - - if !iterator.Previous() { - break - } - } - - return + v.appendSampleWithoutIndexing(fingerprint, timestamp, value) } func newView() view { - return view{ - fingerprintToSeries: make(map[model.Fingerprint]viewStream), - } -} - -type viewStream struct { - values *skiplist.SkipList -} - -func (s viewStream) add(timestamp time.Time, value model.SampleValue) { - s.values.Set(skipListTime(timestamp), singletonValue(value)) -} - -func newViewStream() viewStream { - return viewStream{ - values: skiplist.New(), - } + return view{NewMemorySeriesStorage()} } diff --git a/storage/metric/view_test.go b/storage/metric/view_test.go index 333b628e1..5335c3b06 100644 --- a/storage/metric/view_test.go +++ b/storage/metric/view_test.go @@ -54,7 +54,7 @@ func testBuilder(t test.Tester) { in in out out }{ - // // Ensure that the fingerprint is sorted in proper order. + // Ensure that the fingerprint is sorted in proper order. { in: in{ atTimes: []atTime{